cauldron/cauldron/forge.py
Kayos d649b99aef v0.3 step 5: lean shopping list — claude on-demand foods + game strip
Two changes:

1. foods catalog grows organically. Switch the canonical seed from the
   noisy USDA dump (2462 rows of "'s, classic chicken noodle soup")
   to the Sonnet-curated cut (229 clean rows). search_food() is now
   exact + case-insensitive — Mealie's parser already canonicalizes
   food names household-side, so cauldron just needs to look them up
   verbatim. On miss, the /list view calls forge.fetch_food_info() to
   ask Sonnet for {density_g_per_ml, default_unit_class, common_size_g,
   category}, persists the row with source='claude', and the household's
   actual kitchen catalog builds itself out as Abby uses it.

   Killer case verified end-to-end: "2 cups + 50g + 1.25 lb rice"
   collapses to a single "2.25 lb rice" line on the shopping list once
   rice has a density row.

2. Game system stripped from /plan. Scoreboard panel, streak banner,
   "first to lock takes the week" / "🏆 you locked this one in" copy
   all gone. award_pick_points calls in /api/plan/generate +
   /api/plan/regenerate stopped firing. household_scoreboard /
   household_streak DB methods kept as dead code; cauldron_pick_points
   table left in place — non-destructive, easy to revive later if
   gamification comes back. Goal: get the base flow (pick → plan →
   list) working for Abby first, layer features on after.
2026-04-29 22:02:20 -07:00

309 lines
12 KiB
Python

"""Thin HTTP client for clawdforge — we're a consumer."""
import json
import re
import requests
class ForgeError(RuntimeError):
pass
_DAYS = ("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday")
class Forge:
def __init__(self, *, base_url: str, token: str, default_model: str, default_timeout: int):
self.base_url = base_url.rstrip("/")
self.token = token
self.default_model = default_model
self.default_timeout = default_timeout
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self.token}"}
def healthz(self) -> dict:
r = requests.get(f"{self.base_url}/healthz", headers=self._headers(), timeout=10)
r.raise_for_status()
return r.json()
def run(
self,
prompt: str,
*,
model: str | None = None,
system: str | None = None,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> dict:
"""POST /run. Returns parsed result dict on success.
Raises ForgeError on transport or upstream failure. The 'result' field
in the return is whatever clawdforge parsed out of `claude -p` — usually
a dict (when the prompt asked for JSON), occasionally a string.
"""
body = {"prompt": prompt, "model": model or self.default_model}
if system:
body["system"] = system
if files:
body["files"] = files
if timeout_secs:
body["timeout_secs"] = timeout_secs
# HTTP timeout = subprocess timeout + a 30s margin so we don't bail
# while clawdforge is still doing work for us.
http_timeout = (timeout_secs or self.default_timeout) + 30
try:
r = requests.post(
f"{self.base_url}/run",
headers=self._headers(),
json=body,
timeout=http_timeout,
)
except requests.RequestException as e:
raise ForgeError(f"transport: {e}") from e
if r.status_code >= 400:
raise ForgeError(f"upstream {r.status_code}: {r.text[:500]}")
return r.json()
def generate_plan(
self,
*,
picks: list[dict],
recipes: list[dict],
slots: int = 7,
week_start: str,
model: str | None = None,
) -> list[dict]:
"""Ask Sonnet for a {slots}-day plan. Returns a list of slot dicts
shaped like:
{"day": "monday", "recipe_slug": "...", "recipe_name": "...",
"picker_subs": [...], "reason": "...", "source": "pick"|"mealie"}
Validates structure aggressively — wrong shape / wrong slot count /
slug-not-in-pool → ForgeError. Caller surfaces a 502 to the user.
recipes: [{slug, name, tags?}], picks: [{slug, name, picker_subs}].
Picks are the family's pinned recipes; the prompt mandates each one
appears exactly once when the pool is large enough.
"""
if slots < 1 or slots > 14:
raise ForgeError(f"bad slot count: {slots}")
if not recipes:
raise ForgeError("recipe pool empty — cannot generate")
# Build a slug → name map for validation. Use the recipe pool plus
# picks (picks should already be in the pool, but be defensive).
valid_by_slug: dict[str, str] = {}
for r in recipes:
slug = r.get("slug")
if slug:
valid_by_slug[slug] = r.get("name") or slug
for p in picks:
slug = p.get("slug")
if slug:
valid_by_slug.setdefault(slug, p.get("name") or slug)
prompt = self._build_plan_prompt(
picks=picks, recipes=recipes, slots=slots, week_start=week_start,
)
result = self.run(prompt, model=model or "sonnet")
parsed = _extract_plan_slots(result)
if not isinstance(parsed, list):
raise ForgeError("model output: 'slots' must be a list")
if len(parsed) != slots:
raise ForgeError(f"model output: got {len(parsed)} slots, expected {slots}")
# Pick attribution lookup keyed by slug → list[sub]
pick_subs_by_slug: dict[str, list[str]] = {}
for p in picks:
slug = p.get("slug")
if slug:
pick_subs_by_slug[slug] = list(p.get("picker_subs") or [])
out = []
seen_days: set[str] = set()
for raw in parsed:
if not isinstance(raw, dict):
raise ForgeError("model output: each slot must be an object")
day = (raw.get("day") or "").strip().lower()
slug = (raw.get("recipe_slug") or "").strip()
if day not in _DAYS:
raise ForgeError(f"model output: bad day '{day}'")
if day in seen_days:
raise ForgeError(f"model output: duplicate day '{day}'")
seen_days.add(day)
if not slug or slug not in valid_by_slug:
raise ForgeError(f"model output: unknown recipe_slug '{slug}'")
# Trust the model's picker_subs only if they intersect the real
# set. We have ground truth in pick_subs_by_slug — prefer it.
real_pickers = pick_subs_by_slug.get(slug, [])
model_pickers = raw.get("picker_subs") or []
if not isinstance(model_pickers, list):
model_pickers = []
picker_subs = real_pickers if real_pickers else [
s for s in model_pickers if isinstance(s, str)
]
source = "pick" if real_pickers else "mealie"
out.append({
"day": day,
"recipe_slug": slug,
"recipe_name": valid_by_slug[slug],
"picker_subs": picker_subs,
"reason": (raw.get("reason") or "")[:500],
"source": source,
})
return out
@staticmethod
def _build_plan_prompt(*, picks, recipes, slots, week_start) -> str:
pool_lines = []
for r in recipes:
slug = r.get("slug") or ""
name = r.get("name") or slug
tags = r.get("tags") or []
tag_str = ""
if tags:
# First 3 tags only — keeps prompt token count under control
cleaned = []
for t in tags[:3]:
if isinstance(t, dict):
cleaned.append(t.get("name") or "")
elif isinstance(t, str):
cleaned.append(t)
cleaned = [c for c in cleaned if c]
if cleaned:
tag_str = f" [{', '.join(cleaned)}]"
pool_lines.append(f"- {slug}: {name}{tag_str}")
pick_lines = []
for p in picks:
slug = p.get("slug") or ""
name = p.get("name") or slug
pickers = p.get("pickers") or []
picker_subs = p.get("picker_subs") or []
who = ", ".join(pickers) if pickers else "household"
subs_repr = json.dumps(picker_subs)
pick_lines.append(f"- {slug}: {name} (picked by [{who}], picker_subs={subs_repr})")
picks_block = "\n".join(pick_lines) if pick_lines else "(none)"
pool_block = "\n".join(pool_lines)
return (
f"You are a family meal planner. Build a {slots}-day dinner plan "
f"for the week of {week_start}.\n\n"
f"POOL (all available recipes):\n{pool_block}\n\n"
f"PICKS (recipes the family pre-selected — every pick MUST appear "
f"if pool size >= {slots}; no repeats unless pool < {slots}):\n"
f"{picks_block}\n\n"
"Output JSON ONLY, no prose: "
'{"slots": [{"day": "monday", "recipe_slug": "...", '
'"picker_subs": [...] or [], "reason": "..."}, ...]}\n\n'
"Rules:\n"
f"- Use exactly {slots} recipes\n"
"- Distribute picks evenly across the week — don't bunch them\n"
"- \"reason\" is a one-line user-facing rationale "
"(e.g., \"balances heavy and light meals\", \"honors abby's pick\")\n"
"- \"picker_subs\" is the array of authentik_sub strings of family "
"members who picked this recipe (empty list if AI-chosen)\n"
"- Day order: monday..sunday\n"
)
def fetch_food_info(self, name: str, *, model: str | None = None) -> dict:
"""Ask Sonnet for density + unit class + common size of a single
food. Returns a dict shaped like:
{"density_g_per_ml": 1.04 | null,
"default_unit_class": "mass"|"volume"|"count",
"common_size_g": 150.0 | null,
"category": "produce"|"dairy"|... | null}
density_g_per_ml is null when the food doesn't sensibly convert
between mass and volume (e.g., whole onions, eggs — these are
count-style). common_size_g lets the aggregator handle "1 onion"
as a count → mass conversion. Cheap call, cached forever once
persisted to cauldron_foods.
"""
prompt = (
f"Give nutritional/cooking metadata for the food: {name!r}.\n\n"
"Output JSON ONLY, no prose: "
'{"density_g_per_ml": float|null, '
'"default_unit_class": "mass"|"volume"|"count", '
'"common_size_g": float|null, '
'"category": "produce"|"dairy"|"meat"|"grain"|"baking"|"pantry"'
'|"spice"|"oil"|"beverage"|"other"|null}\n\n'
"Rules:\n"
"- density_g_per_ml: typical packed/cooking density. Null if "
"the food is count-based (whole onions, eggs).\n"
"- default_unit_class: how this food is most often measured "
"(salt=mass; milk=volume; egg=count).\n"
"- common_size_g: the typical mass of one whole unit (1 onion "
"≈ 150g; 1 egg ≈ 50g). Null if the food isn't naturally counted.\n"
"- category: best single fit; null if uncertain.\n"
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_food_info(result)
def _extract_food_info(forge_result: dict) -> dict:
"""Normalize clawdforge wrapper → food info dict. Defensive on shapes."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"forge result not a dict: {str(inner)[:200]}")
cls = (inner.get("default_unit_class") or "mass").strip().lower()
if cls not in ("mass", "volume", "count", "mixed"):
cls = "mass"
def _f(v):
if v is None:
return None
try:
x = float(v)
return x if x > 0 else None
except (TypeError, ValueError):
return None
return {
"density_g_per_ml": _f(inner.get("density_g_per_ml")),
"default_unit_class": cls,
"common_size_g": _f(inner.get("common_size_g")),
"category": (inner.get("category") or None) and str(inner["category"])[:64],
}
def _extract_plan_slots(forge_result: dict):
"""clawdforge wraps its return; the JSON we asked for can sit in a few
different shapes. Normalize aggressively."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
# `result` may be a string when claude returned non-JSON — try to scrape
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if isinstance(inner, dict) and "slots" in inner:
return inner["slots"]
if isinstance(inner, list):
return inner
raise ForgeError(f"forge result missing 'slots' key: {str(inner)[:200]}")
def _parse_json_blob(s: str):
s = s.strip()
# Strip code fences if Sonnet wrapped its output
s = re.sub(r"^```(?:json)?\s*", "", s)
s = re.sub(r"\s*```$", "", s)
try:
return json.loads(s)
except Exception as e:
raise ForgeError(f"could not parse model JSON: {e}; head={s[:200]!r}") from e