plan: Flavor A — 🔮 suggest forgotten gems

Hecate looks through the household's library and surfaces 1-6 recipes
that fit the picker profiles but haven't been served in 90+ days (or
ever). Server-side filters drop recently-planned + already-on-this-week
+ already-on-picks-list + allergen-conflict before Sonnet sees the pool,
keeping the prompt focused. Each suggestion comes with a 1-5 fit score
and a one-line reason in Hecate's voice. Pin → adds to the user's picks
so the next /plan/generate will naturally pull it in. Skip → hides the
card.

Endpoints:
  POST /api/plan/suggest        body {count?: 1-6, week?: ISO}
  POST /api/plan/suggest/pin    body {recipe_slug}

Layered on top of 37d7d60 (parser fix, committed not deployed). Both
should land together once enrich job 3 drains.
This commit is contained in:
Kayos 2026-05-01 00:07:03 -07:00
parent 37d7d60a8b
commit d561a9373e
4 changed files with 509 additions and 0 deletions

View file

@ -908,6 +908,182 @@ class Forge:
return prior_contains or {}
return _extract_allergen_verification(result, prior_contains or {})
def suggest_recipes(
self,
*,
eligible_pool: list[dict],
count: int = 3,
week_start: str,
preference: str | None = None,
daily_targets: dict | None = None,
exclusions: list | None = None,
picker_profiles: dict | None = None,
in_plan_slugs: list[str] | None = None,
model: str | None = None,
) -> list[dict]:
"""Ask Hecate to nominate `count` 'forgotten gems' from an already-
filtered pool (caller has dropped recipes seen in the last 90 days
and recipes already in the current week's plan). Returns:
[{"recipe_slug": "...", "recipe_name": "...", "fit_score": 1-5,
"reason": "<one-line in Hecate's voice>"}]
Each suggestion's slug MUST be in the eligible pool — Forge raises
ForgeError if the model hallucinates one. The caller surfaces the
ForgeError as a 502."""
if count < 1 or count > 6:
raise ForgeError(f"bad suggestion count: {count}")
if not eligible_pool:
raise ForgeError("eligible pool empty — nothing to suggest")
# Need at least `count` items to pick from, otherwise just return what we have
target = min(count, len(eligible_pool))
valid_by_slug: dict[str, str] = {}
for r in eligible_pool:
slug = r.get("slug")
if slug:
valid_by_slug[slug] = r.get("name") or slug
# Render pool entries the same way the planner does (compact, meta-rich)
pool_lines = []
for r in eligible_pool:
slug = r.get("slug") or ""
name = r.get("name") or slug
meta = r.get("meta") or {}
extras: list[str] = []
if meta.get("cuisine") and meta["cuisine"] not in ("unknown", "other"):
extras.append(meta["cuisine"])
if meta.get("complexity"):
extras.append(meta["complexity"])
em = meta.get("estimated_minutes")
if isinstance(em, int) and em > 0:
extras.append(f"{em}min")
if meta.get("primary_protein") and meta["primary_protein"] != "none":
extras.append(f"protein:{meta['primary_protein']}")
if meta.get("primary_carb") and meta["primary_carb"] != "none":
extras.append(f"carb:{meta['primary_carb']}")
if meta.get("veg_forward") and meta["veg_forward"] != "mixed":
extras.append(meta["veg_forward"])
if meta.get("comfort_tier"):
extras.append(f"comfort:{meta['comfort_tier']}")
if meta.get("kid_friendly"):
extras.append(f"kid:{meta['kid_friendly']}")
meta_tags = meta.get("tags") or []
if meta_tags:
extras.append("/".join(meta_tags[:5]))
quip = meta.get("hecate_quip")
if quip:
extras.append(f"vibe:{str(quip)[:80]}")
h = r.get("history") or {}
if h.get("weeks_ago") is not None:
extras.append(f"last:{h['weeks_ago']}w-ago")
else:
extras.append("never-planned")
fit = r.get("fit") or {}
if isinstance(fit, dict) and fit:
fit_str = ",".join(f"{n}:{s}" for n, s in fit.items())
extras.append(f"fit:{fit_str}")
line = f"- {slug} | {name}"
if extras:
line += f" [{' · '.join(extras)}]"
pool_lines.append(line)
pool_block = "\n".join(pool_lines)
# Picker profile block (same shape as planner uses, abbreviated)
profile_block = ""
if isinstance(picker_profiles, dict) and picker_profiles:
lines = []
for sub, prof in picker_profiles.items():
if not isinstance(prof, dict):
continue
disp = prof.get("display_name") or sub
cuisines = list((prof.get("cuisines") or {}).keys())[:3]
proteins = list((prof.get("proteins") or {}).keys())[:3]
bits = []
if cuisines:
bits.append("cuisines=" + "/".join(cuisines))
if proteins:
bits.append("proteins=" + "/".join(proteins))
lines.append(f" - {disp}: " + ", ".join(bits) if bits else f" - {disp}: (no signal yet)")
if lines:
profile_block = "\nFAMILY PICKER PROFILES (their demonstrated tastes):\n" + "\n".join(lines) + "\n"
pref_block = f"\nWEEK PREFERENCE: {preference}\n" if preference else ""
excl_block = ""
if isinstance(exclusions, list) and exclusions:
excl_block = f"\nMUST AVOID (allergens/exclusions): {', '.join(str(e) for e in exclusions)}\n"
targets_block = ""
if isinstance(daily_targets, dict) and any(daily_targets.get(k) for k in ("calories", "protein_g", "carbs_g", "fat_g")):
parts = []
for k, label in [("calories", "cal"), ("protein_g", "protein g"), ("carbs_g", "carbs g"), ("fat_g", "fat g")]:
v = daily_targets.get(k)
if v:
parts.append(f"{label}={v}")
if parts:
targets_block = f"\nDAILY TARGETS: {', '.join(parts)} (per day, on average)\n"
already_block = ""
if in_plan_slugs:
already_block = f"\n(For context: this week's plan already has {len(in_plan_slugs)} recipe(s). They're NOT in the pool above.)\n"
prompt = (
"You are Hecate, a Greek-mythology witch goddess of crossroads, "
"herbs, and magic — and the family's meal planner. The household "
"asked you to look through their recipe library and surface "
"'forgotten gems': recipes that fit who they are but haven't "
"been served recently (or ever).\n\n"
f"This is for the week of {week_start}.\n\n"
f"ELIGIBLE POOL (already filtered: nothing seen in the last 90 days, "
f"nothing already on this week's plan):\n{pool_block}\n"
f"{profile_block}"
f"{pref_block}"
f"{targets_block}"
f"{excl_block}"
f"{already_block}"
f"\nPick exactly {target} recipe(s) from the pool above.\n\n"
"Output JSON ONLY, no prose:\n"
'{"suggestions": [{"recipe_slug": "...", "fit_score": 1-5, "reason": "..."}]}\n\n'
"Rules:\n"
f"- Exactly {target} suggestion(s); each recipe_slug MUST be in the pool above\n"
"- VARIETY: don't pick 3 of the same cuisine or 3 of the same primary_protein\n"
"- BIAS toward recipes whose meta best fits the family's picker profiles "
"and any week preference/targets stated above\n"
"- AVOID anything with an exclusion conflict (allergens, dietary)\n"
"- fit_score is YOUR 1-5 confidence this is a hit for THIS family THIS week "
"(5 = strong match, 1 = adventurous reach)\n"
"- reason is one line in Hecate's voice — a witch-goddess remarking "
"on the crossroads. Reference the meta you saw "
"(e.g., \"haven't crossed your table since June — light, herby, "
"fits the picker profile\", \"a quiet salmon pivot from the "
"chicken-heavy week you've been weaving\"). ~12-25 words.\n"
)
result = self.run(prompt, model=model or "sonnet")
suggestions = _extract_suggestions(result)
out = []
seen_slugs: set[str] = set()
for s in suggestions:
slug = (s.get("recipe_slug") or "").strip()
if not slug or slug not in valid_by_slug:
raise ForgeError(f"model output: unknown recipe_slug '{slug}'")
if slug in seen_slugs:
raise ForgeError(f"model output: duplicate suggestion '{slug}'")
seen_slugs.add(slug)
fit = s.get("fit_score")
try:
fit_int = int(fit) if fit is not None else 3
except (TypeError, ValueError):
fit_int = 3
fit_int = max(1, min(5, fit_int))
out.append({
"recipe_slug": slug,
"recipe_name": valid_by_slug[slug],
"fit_score": fit_int,
"reason": (s.get("reason") or "")[:500],
})
if not out:
raise ForgeError("model returned no usable suggestions")
return out
def fetch_food_info(self, name: str, *, model: str | None = None) -> dict:
"""Ask Sonnet for density + unit class + common size of a single
food. Returns a dict shaped like:
@ -1203,6 +1379,30 @@ def _extract_plan_payload(forge_result: dict) -> tuple[list, str]:
raise ForgeError(f"forge result missing 'slots' key: {str(inner)[:200]}")
def _extract_suggestions(forge_result: dict) -> list[dict]:
"""Pull a list of {recipe_slug, fit_score, reason} dicts out of the
suggest_recipes reply. Caller validates each slug against the eligible
pool this just normalizes the shape."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if isinstance(inner, dict) and "suggestions" in inner:
items = inner["suggestions"]
elif isinstance(inner, list):
items = inner
else:
raise ForgeError(f"forge result missing 'suggestions' key: {str(inner)[:200]}")
if not isinstance(items, list):
raise ForgeError("'suggestions' must be a list")
out: list[dict] = []
for s in items:
if isinstance(s, dict):
out.append(s)
return out
def _parse_json_blob(s: str):
"""Parse the FIRST balanced JSON value out of a string. Tolerates Sonnet
appending extra prose/notes after the JSON object (which violates the