cauldron/cauldron/forge.py
Kayos 10849e0e95 recipe enrichment: per-recipe Sonnet meta for smarter planning
The 'fancy data fun' Cobb wanted: pre-compute structured metadata for
every recipe so the plan generator can match preferences to actual
recipe characteristics, not just match keywords on names.

Sonnet returns per recipe:
  - tags[]: curated descriptors (high-protein, weeknight, one-pan,
    leftovers-good, kid-friendly, etc — picks 3-8 that genuinely apply)
  - cuisine, complexity (easy/medium/involved), estimated_minutes
  - meal_type (breakfast/lunch/dinner/snack/dessert/side/sauce/drink)
  - primary_protein (chicken/beef/pork/fish/seafood/tofu/...)
  - primary_carb (rice/pasta/bread/potato/tortilla/quinoa/...)
  - veg_forward (veg-forward/mixed/meat-forward)
  - comfort_tier (weeknight-easy/hearty-comfort/fancy-occasion/...)
  - season_fit[] + summary one-liner + best_for short phrase

Schema:
- Migration 024: cauldron_recipe_meta keyed by (household_id, recipe_slug),
  meta_json + enrich_version (bumping the version invalidates the cache
  and forces re-walk). One row per Mealie recipe Cobb owns.
- Migration 025: cauldron_enrich_jobs — job runner state. No
  proposals/review needed since metadata is purely additive.

Forge:
- enrich_recipe(recipe) builds a compact prompt with name + description
  + ingredients + steps (capped at 2000 chars total) + yields, asks
  Sonnet for the structured blob. _extract_recipe_meta validates and
  coerces types.

Module enrich_recipes.py:
- Daemon thread runner, walks all household recipes, skips already-
  enriched at current ENRICH_VERSION (idempotent), respects external
  cancel + stuck-job recovery. Skips cross-household recipes (Lake
  Elsinore stuff visible but not enrichable).

Plan generator hookup:
- /api/plan/generate + regenerate now pulls cauldron_recipe_meta and
  splices it into the recipe pool prompt. Each pool line goes from:
      - chicken-stir-fry: Chicken Stir Fry  [asian]
  to:
      - chicken-stir-fry: Chicken Stir Fry  [asian · easy · 30min ·
        protein:chicken · carb:rice · high-protein/weeknight/one-pan]
        quick weeknight stir-fry with leftover-friendly portions
  Sonnet now has rich attributes to actually match a 'high protein
  week' or 'comfort food' or 'quick' preference against, instead of
  guessing from titles.

Endpoints:
- /enrich-recipes UI page (progress bar + start + force re-enrich +
  cancel; no review/approve since meta is additive)
- /api/recipes/enrich-{start,status,cancel} session-authed
- /api/admin/recipes/enrich-start bearer-authed for kayos kick-off

Cost (one-time): ~5s/recipe × 226 = ~20 min walk. Subsequent runs
only process new/changed recipes.
2026-04-30 20:08:20 -07:00

686 lines
30 KiB
Python

"""Thin HTTP client for clawdforge — we're a consumer."""
import json
import re
import requests
class ForgeError(RuntimeError):
pass
_DAYS = ("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday")
class Forge:
def __init__(self, *, base_url: str, token: str, default_model: str, default_timeout: int):
self.base_url = base_url.rstrip("/")
self.token = token
self.default_model = default_model
self.default_timeout = default_timeout
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self.token}"}
def healthz(self) -> dict:
r = requests.get(f"{self.base_url}/healthz", headers=self._headers(), timeout=10)
r.raise_for_status()
return r.json()
def run(
self,
prompt: str,
*,
model: str | None = None,
system: str | None = None,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> dict:
"""POST /run. Returns parsed result dict on success.
Raises ForgeError on transport or upstream failure. The 'result' field
in the return is whatever clawdforge parsed out of `claude -p` — usually
a dict (when the prompt asked for JSON), occasionally a string.
"""
body = {"prompt": prompt, "model": model or self.default_model}
if system:
body["system"] = system
if files:
body["files"] = files
if timeout_secs:
body["timeout_secs"] = timeout_secs
# HTTP timeout = subprocess timeout + a 30s margin so we don't bail
# while clawdforge is still doing work for us.
http_timeout = (timeout_secs or self.default_timeout) + 30
try:
r = requests.post(
f"{self.base_url}/run",
headers=self._headers(),
json=body,
timeout=http_timeout,
)
except requests.RequestException as e:
raise ForgeError(f"transport: {e}") from e
if r.status_code >= 400:
raise ForgeError(f"upstream {r.status_code}: {r.text[:500]}")
return r.json()
def generate_plan(
self,
*,
picks: list[dict],
recipes: list[dict],
slots: int = 7,
week_start: str,
preference: str | None = None,
model: str | None = None,
) -> list[dict]:
"""Ask Sonnet for a {slots}-day plan. Returns a list of slot dicts
shaped like:
{"day": "monday", "recipe_slug": "...", "recipe_name": "...",
"picker_subs": [...], "reason": "...", "source": "pick"|"mealie"}
Validates structure aggressively — wrong shape / wrong slot count /
slug-not-in-pool → ForgeError. Caller surfaces a 502 to the user.
recipes: [{slug, name, tags?}], picks: [{slug, name, picker_subs}].
Picks are the family's pinned recipes; the prompt mandates each one
appears exactly once when the pool is large enough.
"""
if slots < 1 or slots > 14:
raise ForgeError(f"bad slot count: {slots}")
if not recipes:
raise ForgeError("recipe pool empty — cannot generate")
# Build a slug → name map for validation. Use the recipe pool plus
# picks (picks should already be in the pool, but be defensive).
valid_by_slug: dict[str, str] = {}
for r in recipes:
slug = r.get("slug")
if slug:
valid_by_slug[slug] = r.get("name") or slug
for p in picks:
slug = p.get("slug")
if slug:
valid_by_slug.setdefault(slug, p.get("name") or slug)
prompt = self._build_plan_prompt(
picks=picks, recipes=recipes, slots=slots, week_start=week_start,
preference=preference,
)
result = self.run(prompt, model=model or "sonnet")
parsed = _extract_plan_slots(result)
if not isinstance(parsed, list):
raise ForgeError("model output: 'slots' must be a list")
if len(parsed) != slots:
raise ForgeError(f"model output: got {len(parsed)} slots, expected {slots}")
# Pick attribution lookup keyed by slug → list[sub]
pick_subs_by_slug: dict[str, list[str]] = {}
for p in picks:
slug = p.get("slug")
if slug:
pick_subs_by_slug[slug] = list(p.get("picker_subs") or [])
out = []
seen_days: set[str] = set()
for raw in parsed:
if not isinstance(raw, dict):
raise ForgeError("model output: each slot must be an object")
day = (raw.get("day") or "").strip().lower()
slug = (raw.get("recipe_slug") or "").strip()
if day not in _DAYS:
raise ForgeError(f"model output: bad day '{day}'")
if day in seen_days:
raise ForgeError(f"model output: duplicate day '{day}'")
seen_days.add(day)
if not slug or slug not in valid_by_slug:
raise ForgeError(f"model output: unknown recipe_slug '{slug}'")
# Trust the model's picker_subs only if they intersect the real
# set. We have ground truth in pick_subs_by_slug — prefer it.
real_pickers = pick_subs_by_slug.get(slug, [])
model_pickers = raw.get("picker_subs") or []
if not isinstance(model_pickers, list):
model_pickers = []
picker_subs = real_pickers if real_pickers else [
s for s in model_pickers if isinstance(s, str)
]
source = "pick" if real_pickers else "mealie"
out.append({
"day": day,
"recipe_slug": slug,
"recipe_name": valid_by_slug[slug],
"picker_subs": picker_subs,
"reason": (raw.get("reason") or "")[:500],
"source": source,
})
return out
@staticmethod
def _build_plan_prompt(*, picks, recipes, slots, week_start, preference=None) -> str:
pool_lines = []
for r in recipes:
slug = r.get("slug") or ""
name = r.get("name") or slug
tags = r.get("tags") or []
meta = r.get("meta") or {}
extras: list[str] = []
# First 3 Mealie tags
if tags:
cleaned = []
for t in tags[:3]:
if isinstance(t, dict):
cleaned.append(t.get("name") or "")
elif isinstance(t, str):
cleaned.append(t)
cleaned = [c for c in cleaned if c]
if cleaned:
extras.append(", ".join(cleaned))
# Sonnet-generated meta — the actual high-signal stuff
if meta:
if meta.get("cuisine") and meta["cuisine"] not in ("unknown", "other"):
extras.append(meta["cuisine"])
if meta.get("complexity"):
extras.append(meta["complexity"])
em = meta.get("estimated_minutes")
if isinstance(em, int) and em > 0:
extras.append(f"{em}min")
if meta.get("primary_protein") and meta["primary_protein"] != "none":
extras.append(f"protein:{meta['primary_protein']}")
if meta.get("primary_carb") and meta["primary_carb"] != "none":
extras.append(f"carb:{meta['primary_carb']}")
if meta.get("veg_forward") and meta["veg_forward"] != "mixed":
extras.append(meta["veg_forward"])
meta_tags = meta.get("tags") or []
if meta_tags:
extras.append("/".join(meta_tags[:5]))
if meta.get("summary"):
# Inline 1-line summary helps Sonnet match preferences
summary = str(meta["summary"])[:140]
pool_lines.append(f"- {slug}: {name} [{' · '.join(extras)}]\n {summary}")
continue
extra_str = f" [{' · '.join(extras)}]" if extras else ""
pool_lines.append(f"- {slug}: {name}{extra_str}")
pick_lines = []
for p in picks:
slug = p.get("slug") or ""
name = p.get("name") or slug
pickers = p.get("pickers") or []
picker_subs = p.get("picker_subs") or []
who = ", ".join(pickers) if pickers else "household"
subs_repr = json.dumps(picker_subs)
pick_lines.append(f"- {slug}: {name} (picked by [{who}], picker_subs={subs_repr})")
picks_block = "\n".join(pick_lines) if pick_lines else "(none)"
pool_block = "\n".join(pool_lines)
pref_clean = (preference or "").strip()
pref_block = ""
if pref_clean:
pref_block = (
f"\nHOUSEHOLD PREFERENCE FOR THIS WEEK:\n \"{pref_clean}\"\n\n"
"When the preference is set, BIAS your AI-chosen slots toward "
"recipes from the pool that match it. The preference may describe "
"diet (\"high protein, low carb\"), occasion (\"light meals, "
"recovery week\"), shopping constraints (\"no fish, out of "
"season\"), or vibe (\"carb load, training hard\"). The "
"preference does NOT override picks — every pick still appears. "
"It DOES change which other recipes from the pool you choose to "
"fill the remaining slots.\n"
)
return (
f"You are a family meal planner. Build a {slots}-day dinner plan "
f"for the week of {week_start}.\n\n"
f"POOL (all available recipes):\n{pool_block}\n\n"
f"PICKS (recipes the family pre-selected — every pick MUST appear "
f"if pool size >= {slots}; no repeats unless pool < {slots}):\n"
f"{picks_block}\n"
f"{pref_block}"
"Output JSON ONLY, no prose: "
'{"slots": [{"day": "monday", "recipe_slug": "...", '
'"picker_subs": [...] or [], "reason": "..."}, ...]}\n\n'
"Rules:\n"
f"- Use exactly {slots} recipes\n"
"- Distribute picks evenly across the week — don't bunch them\n"
"- \"reason\" is a one-line user-facing rationale "
"(e.g., \"balances heavy and light meals\", \"honors abby's pick\", "
"\"high-protein lean — pairs with the gym week\")\n"
"- \"picker_subs\" is the array of authentik_sub strings of family "
"members who picked this recipe (empty list if AI-chosen)\n"
"- Day order: monday..sunday\n"
)
def recipe_dedupe_decision(
self, recipes: list[dict], *, model: str | None = None
) -> dict:
"""Ask Sonnet whether a cluster of similar-named recipes are
actually duplicates (same recipe imported twice / hand-copied with
a slight title tweak / etc) versus distinct recipes that just
happen to look similar by name.
Input: list of recipe summaries — {slug, name, source_url,
ingredient_summary (concise list), step_count, yields}.
Returns:
{"duplicates": bool,
"canonical_slug": "<slug to keep>",
"delete_slugs": ["<slug>", ...],
"reason": "<one-line explanation>"}
duplicates=false means the cluster is a false positive and nothing
should be deleted. canonical_slug + delete_slugs must be empty in
that case. Be conservative — when in doubt return false."""
items = [
{
"slug": r.get("slug"),
"name": r.get("name"),
"source_url": r.get("source_url") or "",
"ingredient_summary": r.get("ingredient_summary") or [],
"step_count": r.get("step_count") or 0,
"yields": r.get("yields") or "",
}
for r in recipes
]
prompt = (
"You are deciding whether a cluster of similar-named recipes "
"are actual duplicates (same recipe imported or hand-copied "
"twice) or distinct recipes that share words in the title.\n\n"
f"Cluster:\n{json.dumps(items, indent=2)}\n\n"
"Output JSON ONLY, no prose: "
'{"duplicates": true|false, '
'"canonical_slug": "<slug to keep, or empty>", '
'"delete_slugs": ["<slug>", ...], '
'"reason": "<one-line reasoning>"}\n\n'
"Rules:\n"
"- duplicates=true ONLY when the recipes are clearly the same "
" dish prepared the same way (matching ingredient sets, similar "
" step counts, often shared source_url). Slight title variations "
" ('Banana Bread' vs 'Best Banana Bread') with same body = dupes.\n"
"- Pick canonical_slug = the recipe with the cleanest name, the "
" most complete data (more steps + yields filled in beats less). "
" When tied, pick the older one (lexicographic slug order is fine "
" since Mealie slugs include date-ish suffixes for dupes).\n"
"- delete_slugs = the OTHER cluster members. Mealie DELETE removes "
" them permanently — only suggest deletion when you're confident.\n"
"- duplicates=false when ingredient sets differ meaningfully, OR "
" when names suggest distinct dishes ('Chicken Stir Fry' vs "
" 'Chicken Fajitas'), OR when you genuinely cannot tell.\n"
"- Be CONSERVATIVE — false negatives are recoverable (recipes "
" stay), false positives delete data."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_recipe_dedupe_decision(result)
def cluster_decision(
self, foods: list[dict], *, model: str | None = None
) -> dict:
"""Ask Sonnet whether a cluster of similar-named foods are
actually duplicates. Input: list of {id, name, plural_name?, aliases?}.
Returns:
{"merge": bool,
"canonical_id": "<id>", # the survivor (highest-quality name/aliases)
"canonical_name": "<str>", # the survivor's name (echoed for the UI)
"discard_ids": ["<id>", ...], # the ones to merge into canonical
"alias_additions": ["<name>", ...], # discarded names worth keeping as aliases on the survivor
"reason": "<one-line explanation>"}
merge=false means the cluster is a false positive (foods that look
similar but are distinct, e.g. "olive oil" vs "olive"). In that case
canonical_id may be empty and discard_ids must be empty.
"""
items = [
{
"id": f.get("id"),
"name": f.get("name"),
"plural_name": f.get("pluralName") or f.get("plural_name"),
"aliases": [
(a.get("name") if isinstance(a, dict) else a)
for a in (f.get("aliases") or [])
],
}
for f in foods
]
prompt = (
"You are deciding whether a cluster of food rows from a recipe "
"database are duplicates that should be merged into one canonical "
"row. The names came from years of recipe imports + manual entry "
"so plural/case/wording variations are common.\n\n"
f"Cluster:\n{json.dumps(items, indent=2)}\n\n"
"Output JSON ONLY, no prose: "
'{"merge": true|false, '
'"canonical_id": "<id of the survivor or empty>", '
'"canonical_name": "<survivor name or empty>", '
'"discard_ids": ["<id>", ...], '
'"alias_additions": ["<name to add as alias on survivor>", ...], '
'"reason": "<one-line reasoning>"}\n\n'
"Rules:\n"
"- Pick the survivor whose name is the cleanest canonical "
" (lowercase, singular when applicable, no brand, no clinical "
" qualifiers like 'raw' or 'unenriched').\n"
"- discard_ids are the OTHER cluster members — Mealie will rewrite "
" recipe references to point at canonical_id.\n"
"- alias_additions = the discarded NAMES (or any close variants you "
" noticed in plural_name/aliases) that the survivor should adopt as "
" aliases so the parser fuzzy-matches them in the future.\n"
"- merge=false ONLY when the cluster is a false positive (e.g. "
" 'olive oil' vs 'olive', 'butter' vs 'peanut butter'). In that "
" case canonical_id and discard_ids must both be empty.\n"
"- Be conservative — when in doubt, merge=false."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_cluster_decision(result)
def enrich_recipe(self, recipe: dict, *, model: str | None = None) -> dict:
"""Generate structured metadata for a recipe so the plan generator
can match preferences to actual recipe characteristics, not just
names.
Input: a Mealie recipe dict (uses name + description + ingredients
+ instructions + yields + recipeYield).
Output (validated):
{
"tags": [<curated descriptor strings>],
# e.g. "high-protein", "weeknight", "one-pan",
# "kid-friendly", "leftovers-good", "freezer-friendly"
"cuisine": "<american|italian|asian|mexican|...|other|unknown>",
"complexity": "easy|medium|involved",
"estimated_minutes": <int>,
"meal_type": "breakfast|lunch|dinner|snack|dessert|side",
"primary_protein": "<chicken|beef|pork|fish|tofu|beans|eggs|none|mixed>",
"primary_carb": "<rice|pasta|bread|potato|tortilla|quinoa|none|mixed>",
"veg_forward": "veg-forward|mixed|meat-forward",
"comfort_tier": "<weeknight-easy|comfort|fancy|kid-friendly|...>",
"season_fit": [<season strings>],
"summary": "<one-line vibe>",
"best_for": "<short phrase about when this is the right pick>"
}
Cheap call, idempotent — run once per recipe and cache forever
(or until enrich_version bumps)."""
# Build a compact recipe summary for the prompt
ings = recipe.get("recipeIngredient") or []
ing_lines: list[str] = []
for i in ings[:30]:
food = (i.get("food") or {}).get("name") if isinstance(i.get("food"), dict) else None
qty = i.get("quantity")
unit = (i.get("unit") or {}).get("name") if isinstance(i.get("unit"), dict) else None
note = i.get("note") or ""
line = ""
if qty not in (None, ""):
line += f"{qty} "
if unit:
line += f"{unit} "
if food:
line += food
elif note:
line += note
if line.strip():
ing_lines.append(line.strip())
instructions = recipe.get("recipeInstructions") or []
steps: list[str] = []
char_budget = 2000
for step in instructions:
if not isinstance(step, dict):
continue
text = (step.get("text") or "").strip()
if not text or char_budget <= 0:
continue
if len(text) > char_budget:
text = text[:char_budget] + ""
steps.append(text)
char_budget -= len(text)
prompt = (
"Given the following recipe, return structured metadata to help "
"an AI meal planner pick recipes that match user preferences "
"('high protein week', 'carb load', 'light recovery', etc).\n\n"
f"NAME: {recipe.get('name') or '(unnamed)'}\n"
f"DESCRIPTION: {(recipe.get('description') or '').strip()[:400]}\n"
f"YIELDS: {(recipe.get('recipeYield') or '').strip()[:80]}\n"
f"INGREDIENTS:\n - " + "\n - ".join(ing_lines or ['(none listed)']) + "\n"
f"STEPS:\n - " + "\n - ".join(steps or ['(none listed)']) + "\n\n"
"Output JSON ONLY, no prose:\n"
"{\n"
' "tags": [<curated descriptor strings — pick 3-8 from these or invent close variants: '
'"high-protein","low-carb","high-carb","low-fat","high-fiber",'
'"vegetarian","vegan","gluten-free","dairy-free","keto","paleo",'
'"weeknight","weekend","one-pan","one-pot","sheet-pan","slow-cooker","instant-pot",'
'"freezer-friendly","leftovers-good","kid-friendly","spicy","mild",'
'"hearty","light","fresh","comfort","fancy","quick","make-ahead">],\n'
' "cuisine": "<american|italian|asian|mexican|mediterranean|indian|french|middle-eastern|other|unknown>",\n'
' "complexity": "<easy|medium|involved>",\n'
' "estimated_minutes": <int total time including prep>,\n'
' "meal_type": "<breakfast|lunch|dinner|snack|dessert|side|sauce|drink>",\n'
' "primary_protein": "<chicken|beef|pork|fish|seafood|tofu|tempeh|beans|eggs|cheese|nuts|none|mixed>",\n'
' "primary_carb": "<rice|pasta|bread|potato|tortilla|quinoa|noodles|grain|none|mixed>",\n'
' "veg_forward": "<veg-forward|mixed|meat-forward>",\n'
' "comfort_tier": "<weeknight-easy|hearty-comfort|fancy-occasion|kid-friendly|date-night|crowd-pleaser>",\n'
' "season_fit": [<one or more of "spring","summer","fall","winter","year-round">],\n'
' "summary": "<one-line vibe — what KIND of meal is this>",\n'
' "best_for": "<short phrase: when is this the right pick>"\n'
"}\n\n"
"Rules:\n"
"- Return ONLY the JSON object, no markdown fences, no prose.\n"
"- Be concrete: 'high-protein' goes in tags ONLY if the recipe genuinely "
"qualifies (significant meat/eggs/dairy/protein source per serving).\n"
"- estimated_minutes: best guess from prep + cook implied by steps. Dishes "
"needing rise/marinade time count that time.\n"
"- complexity: 'easy' = ≤30 min + ≤7 ingredients + simple technique; "
"'medium' = 30-90 min OR moderate technique; 'involved' = >90 min OR "
"advanced technique (lamination, fermentation, multi-component).\n"
"- summary should describe the vibe / use-case, not just restate the name. "
"e.g. 'quick weeknight stir-fry with leftover-friendly portions' beats "
"'chicken stir fry with rice'.\n"
"- When uncertain on a categorical, use 'unknown' or 'other' rather than guessing."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=90)
return _extract_recipe_meta(result)
def fetch_food_info(self, name: str, *, model: str | None = None) -> dict:
"""Ask Sonnet for density + unit class + common size of a single
food. Returns a dict shaped like:
{"density_g_per_ml": 1.04 | null,
"default_unit_class": "mass"|"volume"|"count",
"common_size_g": 150.0 | null,
"category": "produce"|"dairy"|... | null}
density_g_per_ml is null when the food doesn't sensibly convert
between mass and volume (e.g., whole onions, eggs — these are
count-style). common_size_g lets the aggregator handle "1 onion"
as a count → mass conversion. Cheap call, cached forever once
persisted to cauldron_foods.
"""
prompt = (
f"Give nutritional/cooking metadata for the food: {name!r}.\n\n"
"Output JSON ONLY, no prose: "
'{"density_g_per_ml": float|null, '
'"default_unit_class": "mass"|"volume"|"count", '
'"common_size_g": float|null, '
'"category": "produce"|"dairy"|"meat"|"grain"|"baking"|"pantry"'
'|"spice"|"oil"|"beverage"|"other"|null}\n\n'
"Rules:\n"
"- density_g_per_ml: typical packed/cooking density. Null if "
"the food is count-based (whole onions, eggs).\n"
"- default_unit_class: how this food is most often measured "
"(salt=mass; milk=volume; egg=count).\n"
"- common_size_g: the typical mass of one whole unit (1 onion "
"≈ 150g; 1 egg ≈ 50g). Null if the food isn't naturally counted.\n"
"- category: best single fit; null if uncertain.\n"
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_food_info(result)
def _extract_recipe_meta(forge_result: dict) -> dict:
"""Validate the recipe metadata blob from Sonnet. Coerces types,
normalizes enums to lowercase, drops fields not in the schema."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"recipe meta not a dict: {str(inner)[:200]}")
def _str(v, default=""):
return str(v).strip().lower()[:64] if isinstance(v, str) and v.strip() else default
def _str_long(v, default=""):
return str(v).strip()[:300] if isinstance(v, str) and v.strip() else default
def _str_list(v) -> list[str]:
if not isinstance(v, list):
return []
out = []
for item in v:
if isinstance(item, str) and item.strip():
out.append(item.strip().lower()[:48])
return out[:12]
def _int(v, default=0):
try:
return max(0, int(v))
except (TypeError, ValueError):
return default
return {
"tags": _str_list(inner.get("tags")),
"cuisine": _str(inner.get("cuisine"), "unknown"),
"complexity": _str(inner.get("complexity"), "medium"),
"estimated_minutes": _int(inner.get("estimated_minutes")),
"meal_type": _str(inner.get("meal_type"), "dinner"),
"primary_protein": _str(inner.get("primary_protein"), "none"),
"primary_carb": _str(inner.get("primary_carb"), "none"),
"veg_forward": _str(inner.get("veg_forward"), "mixed"),
"comfort_tier": _str(inner.get("comfort_tier"), "weeknight-easy"),
"season_fit": _str_list(inner.get("season_fit")) or ["year-round"],
"summary": _str_long(inner.get("summary")),
"best_for": _str_long(inner.get("best_for")),
}
def _extract_recipe_dedupe_decision(forge_result: dict) -> dict:
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"recipe dedupe decision not a dict: {str(inner)[:200]}")
duplicates = bool(inner.get("duplicates"))
canonical_slug = str(inner.get("canonical_slug") or "")
delete_raw = inner.get("delete_slugs") or []
delete_slugs = [str(x) for x in delete_raw if isinstance(x, str) and x.strip()]
reason = str(inner.get("reason") or "")[:500]
if not duplicates:
canonical_slug = ""
delete_slugs = []
return {
"duplicates": duplicates,
"canonical_slug": canonical_slug,
"delete_slugs": delete_slugs,
"reason": reason,
}
def _extract_cluster_decision(forge_result: dict) -> dict:
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"cluster decision not a dict: {str(inner)[:200]}")
merge = bool(inner.get("merge"))
canonical_id = str(inner.get("canonical_id") or "")
canonical_name = str(inner.get("canonical_name") or "")
discard_raw = inner.get("discard_ids") or []
discard_ids = [str(x) for x in discard_raw if isinstance(x, (str, int))]
aliases_raw = inner.get("alias_additions") or []
alias_additions = [str(x) for x in aliases_raw if isinstance(x, str) and x.strip()]
reason = str(inner.get("reason") or "")[:500]
if not merge:
canonical_id = ""
discard_ids = []
return {
"merge": merge,
"canonical_id": canonical_id,
"canonical_name": canonical_name,
"discard_ids": discard_ids,
"alias_additions": alias_additions,
"reason": reason,
}
def _extract_food_info(forge_result: dict) -> dict:
"""Normalize clawdforge wrapper → food info dict. Defensive on shapes."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"forge result not a dict: {str(inner)[:200]}")
cls = (inner.get("default_unit_class") or "mass").strip().lower()
if cls not in ("mass", "volume", "count", "mixed"):
cls = "mass"
def _f(v):
if v is None:
return None
try:
x = float(v)
return x if x > 0 else None
except (TypeError, ValueError):
return None
return {
"density_g_per_ml": _f(inner.get("density_g_per_ml")),
"default_unit_class": cls,
"common_size_g": _f(inner.get("common_size_g")),
"category": (inner.get("category") or None) and str(inner["category"])[:64],
}
def _extract_plan_slots(forge_result: dict):
"""clawdforge wraps its return; the JSON we asked for can sit in a few
different shapes. Normalize aggressively."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
# `result` may be a string when claude returned non-JSON — try to scrape
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if isinstance(inner, dict) and "slots" in inner:
return inner["slots"]
if isinstance(inner, list):
return inner
raise ForgeError(f"forge result missing 'slots' key: {str(inner)[:200]}")
def _parse_json_blob(s: str):
s = s.strip()
# Strip code fences if Sonnet wrapped its output
s = re.sub(r"^```(?:json)?\s*", "", s)
s = re.sub(r"\s*```$", "", s)
try:
return json.loads(s)
except Exception as e:
raise ForgeError(f"could not parse model JSON: {e}; head={s[:200]!r}") from e