cauldron/cauldron/aggregator.py
Kayos 1c943ec2d8 audit: fix all critical + high findings before dogfood
Six audit-driven fixes from 2026-05-02 punch list at memory/cauldron-codebase-audit.md.

CRITICAL
- F-1 routes: SSRF guard on /api/discover/scrape-start. Every URL is
  validated via discover_recipes.is_public_url() — parses host, rejects
  IP literals in private/loopback/link-local/multicast/reserved ranges,
  resolves hostnames via getaddrinfo and rejects if any A/AAAA is private.
  Defense-in-depth: _scrape_one re-validates before fetch in case any
  future caller bypasses the route. Rejected URLs are returned in the
  response payload so the user knows which were skipped.
- F-6 domain: prompt-injection mitigation on enrich_recipe + verify_allergens.
  New apply_allergen_safety_override() in forge.py runs regex pattern-
  matching against the raw ingredient text for the SIX anaphylaxis-class
  allergens (peanuts, nuts, shellfish, fish, eggs, sesame, dairy). On
  match, force contains.<allergen>=TRUE regardless of Sonnet output. False
  positives are recoverable; undetected anaphylaxis is not. Pork/soy/
  gluten not auto-overridden (religious/dietary or too-common).

HIGH
- F-2 routes: /api/discover/reject swapped from global status flip to
  per-household scope. New migration 039 cauldron_discover_skips
  (discover_id, household_id, skipped_by_sub, skipped_at) join table.
  list_discovered_recipes default view filters out caller-household
  skips; ?status=skipped surfaces them for unskip. Different households
  have different tastes.
- F-3a routes: /login?next= same-origin validation. Reject anything that
  doesn't start with `/`, AND reject `//evil.example` protocol-relative
  redirects. One-line fix.
- F-10 domain: Sterilizer.apply_recipe ingredient-count guard. Refuse to
  apply if Mealie's current recipeIngredient length differs from the
  preview's proposals length. Python's zip would silently truncate;
  user edits made during the 60-300s Sonnet window now raise
  RuntimeError instead of getting clobbered. Bulk runner already catches
  RuntimeError per-recipe, marks proposal stale.
- F-15 domain: aggregator qty=None safety net. Ingredients with no
  quantity now go to a separate no_qty_items list instead of being
  silently coerced to 0.0 (which then failed the `any(qty for ...)`
  truthiness check and dropped the food off the shopping list). If no
  other line was emitted, write a "qty unspecified" placeholder so the
  food APPEARS on the list. If a sized line WAS emitted, append a
  "+ N ingredient(s) with no quantity" note.

ALSO (one-liners called out in the punch list)
- Migration 029 DROP INDEX gets IF EXISTS — prevents boot-brick on
  partial-failure retry.
- Flavor B prefix prompt rule — Sonnet now told to keep `lib:`/`disc:`
  prefix verbatim; prevents intermittent 502s on the panel just shipped.
- list_discover_eligible_for_group switched from LEFT JOIN to NOT EXISTS
  subqueries — fixes F-5 data (LIMIT-shrink from cross-group import
  multiplication) and adds the per-household skip filter cleanly.

All edits AST-verified. Allergen regex tested with peanut/fish/clean
inputs — flips correctly, preserves Sonnet TRUEs, no over-broad coverage.

Mediums + lows from the audit are tracked in
memory/cauldron-codebase-audit.md and deferred until Cobb hits them
during dogfood.
2026-05-02 12:43:04 -07:00

339 lines
14 KiB
Python

"""Unit-aware shopping list aggregator.
Cobb's killer feature: take ingredients from N recipes, return a single
consolidated shopping list with per-food totals.
Examples:
In: [(2, "cup", "rice"), (1.25, "lb", "rice"), (3, "tbsp", "olive oil")]
Out: [("rice", 947, "g"), ("olive oil", 42, "ml")]
Mixed mass+volume aggregation uses density from cauldron_foods. If density
is unknown or units don't reconcile (count + mass), we split into separate
shopping-list lines but group them under one heading.
The aggregator is a pure module — no DB or HTTP. Inject a foods_lookup
callable: foods_lookup(name) -> {canonical_name, density_g_per_ml,
default_unit_class, common_size_g} or None.
"""
from collections import defaultdict
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Callable, Iterable
# ---------- unit tables ----------------------------------------------------
# All conversions normalize to base SI: ml for volume, g for mass.
VOLUME_TO_ML = {
"ml": 1.0, "milliliter": 1.0, "milliliters": 1.0,
"l": 1000.0, "liter": 1000.0, "liters": 1000.0,
"tsp": 4.92892, "teaspoon": 4.92892, "teaspoons": 4.92892,
"tbsp": 14.7868, "tablespoon": 14.7868, "tablespoons": 14.7868,
"fl oz": 29.5735, "fluid ounce": 29.5735, "fluid ounces": 29.5735,
"cup": 236.588, "cups": 236.588,
"pint": 473.176, "pints": 473.176,
"quart": 946.353, "quarts": 946.353,
"gallon": 3785.41, "gallons": 3785.41,
}
MASS_TO_G = {
"g": 1.0, "gram": 1.0, "grams": 1.0,
"kg": 1000.0, "kilogram": 1000.0, "kilograms": 1000.0,
"mg": 0.001, "milligram": 0.001, "milligrams": 0.001,
"oz": 28.3495, "ounce": 28.3495, "ounces": 28.3495,
"lb": 453.592, "lbs": 453.592, "pound": 453.592, "pounds": 453.592,
}
# Count-style units. Their qty IS the count; common_size_g resolves to mass.
COUNT_UNITS = {
"", "each", "ea", "piece", "pieces", "whole",
"clove", "cloves", "slice", "slices", "leaf", "leaves",
"head", "heads", "bunch", "bunches", "sprig", "sprigs",
"stalk", "stalks", "ear", "ears",
"can", "cans", "package", "packages", "pkg", "packet", "packets",
"bottle", "bottles", "jar", "jars", "box", "boxes", "bag", "bags",
}
VAGUE_UNITS = {
"pinch", "pinches", "dash", "dashes", "handful", "handfuls",
"to taste", "as needed", "splash", "drizzle",
}
def classify_unit(unit: str | None) -> str:
u = (unit or "").strip().lower()
if u in VOLUME_TO_ML:
return "volume"
if u in MASS_TO_G:
return "mass"
if u in VAGUE_UNITS:
return "vague"
if u in COUNT_UNITS:
return "count"
return "unknown"
def to_ml(qty: float, unit: str) -> float | None:
f = VOLUME_TO_ML.get((unit or "").strip().lower())
return qty * f if f is not None else None
def to_g(qty: float, unit: str) -> float | None:
f = MASS_TO_G.get((unit or "").strip().lower())
return qty * f if f is not None else None
def display_mass(g: float) -> tuple[float, str]:
"""Pick a store-friendly mass display for a quantity in grams."""
if g < 30:
return (round(g, 1), "g")
if g < 500:
return (round(g / 28.3495 * 2) / 2, "oz") # nearest 0.5 oz
if g < 2000:
return (round(g / 453.592 * 4) / 4, "lb") # nearest 0.25 lb
return (round(g / 453.592, 1), "lb")
def display_volume(ml: float) -> tuple[float, str]:
"""Pick a store-friendly volume display for a quantity in ml."""
if ml < 30:
return (round(ml / 4.92892, 1), "tsp")
if ml < 250:
return (round(ml / 14.7868, 1), "tbsp")
if ml < 1000:
return (round(ml / 236.588, 2), "cup")
if ml < 4000:
return (round(ml / 946.353, 2), "qt")
return (round(ml / 3785.41, 1), "gal")
# ---------- model ----------------------------------------------------------
@dataclass
class Ingredient:
"""One line on a recipe — what we feed in."""
qty: float | None
unit: str | None
food_name: str # raw food name (used for display + Sonnet fallback)
mealie_food_id: str | None = None # Mealie's UUID; primary grouping key when present
note: str | None = None
source_recipe_slug: str | None = None
original_text: str | None = None
@dataclass
class ShoppingLine:
"""One line on the consolidated shopping list — what we return."""
food: str
qty: float | None
unit: str
contributors: list[str] = field(default_factory=list) # original ingredient texts that fed this line
notes: list[str] = field(default_factory=list) # collected notes ("chopped", "minced")
is_split: bool = False # True if this is one line of a split (e.g. count + mass for same food)
# ---------- core aggregation -----------------------------------------------
def aggregate(
ingredients: Iterable[Ingredient],
foods_lookup: Callable[[str, str | None], dict | None],
) -> list[ShoppingLine]:
"""Group ingredients by Mealie food.id (when available) and consolidate
quantities. Output is one shopping-list line per food, or N lines per
food when units don't reconcile.
foods_lookup(food_name, mealie_food_id) returns metadata:
{canonical_name, density_g_per_ml, default_unit_class, common_size_g}
or None for foods we have no record of. The id-keyed lookup means
"rice" in 3 different recipes always groups under one canonical line
as long as Mealie has them all linked to the same food row.
"""
# Step 1: bucket by stable key. Prefer Mealie food.id when present
# (guaranteed consistent across recipes for the same food). Fall
# back to a normalized name when the ingredient hasn't been linked
# to a Mealie food row.
by_food: dict[str, list[Ingredient]] = defaultdict(list)
food_meta: dict[str, dict] = {}
for ing in ingredients:
if not ing.food_name and not ing.mealie_food_id:
continue
# Lookup metadata. Both args passed; lookup decides which is
# primary (id-first when set; name as fallback for Sonnet calls).
meta = foods_lookup(ing.food_name, ing.mealie_food_id) or {
"canonical_name": (ing.food_name or "").strip().lower() or "(unknown)"
}
# Stable grouping key: id when we have it, normalized name otherwise.
key = ing.mealie_food_id or meta.get("canonical_name") or ing.food_name.strip().lower()
# Display name: prefer canonical_name from metadata, else the
# Mealie food.name we received.
canonical_display = meta.get("canonical_name") or (ing.food_name or "").strip().lower()
# Stash the display once per group
if key not in food_meta:
food_meta[key] = {**meta, "canonical_name": canonical_display}
by_food[key].append(ing)
out: list[ShoppingLine] = []
for key, group in by_food.items():
meta = food_meta[key]
out.extend(_aggregate_one_food(meta["canonical_name"], group, meta))
return out
def _aggregate_one_food(
food: str,
items: list[Ingredient],
meta: dict,
) -> list[ShoppingLine]:
"""All ingredients for ONE food → 1+ ShoppingLines."""
# Bucket by unit class. Ingredients with qty=None go to a separate
# `no_qty` bucket so they DON'T silently disappear from the shopping
# list when Mealie's parser couldn't extract a number (audit F-15
# domain, 2026-05-02). The killer feature should surface "buy onion"
# even if the source recipe just said "1 onion, chopped" without a
# parseable quantity.
buckets: dict[str, list[tuple[Ingredient, float]]] = {
"mass": [], "volume": [], "count": [], "vague": [], "unknown": [],
}
no_qty_items: list[Ingredient] = []
for ing in items:
if ing.qty is None and (ing.unit or "").strip() == "":
no_qty_items.append(ing)
continue
cls = classify_unit(ing.unit)
buckets[cls].append((ing, ing.qty if ing.qty is not None else 0.0))
lines: list[ShoppingLine] = []
notes_acc = sorted({i.note.strip() for i in items if i.note and i.note.strip()})
contribs = [
i.original_text or _render(i)
for i in items
if (i.original_text or i.qty is not None or i.note)
]
# Pull no-qty original-text contributors into the contribs list so
# they appear under whatever line we emit (or the standalone fallback)
for ing in no_qty_items:
text = ing.original_text or _render(ing)
if text and text not in contribs:
contribs.append(text)
density = float(meta.get("density_g_per_ml") or 0) or None
have_mass = any(qty for _, qty in buckets["mass"])
have_vol = any(qty for _, qty in buckets["volume"])
have_cnt = any(qty for _, qty in buckets["count"])
have_unk = bool(buckets["unknown"])
have_vague = bool(buckets["vague"])
# CASE 1: ONLY one of mass / volume / count present → easy sum
classes_present = sum([have_mass, have_vol, have_cnt])
if classes_present == 1 and not have_unk:
if have_mass:
total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"])
q, u = display_mass(total_g)
lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc))
elif have_vol:
total_ml = sum(to_ml(qty, ing.unit) or 0 for ing, qty in buckets["volume"])
q, u = display_volume(total_ml)
lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc))
elif have_cnt:
total = sum(qty for _, qty in buckets["count"])
unit = buckets["count"][0][0].unit or "ea"
lines.append(ShoppingLine(food=food, qty=total, unit=unit, contributors=contribs, notes=notes_acc))
# CASE 2: mass + volume (the killer case) → use density if known
elif have_mass and have_vol and not have_cnt and density:
total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"])
for ing, qty in buckets["volume"]:
ml = to_ml(qty, ing.unit) or 0
total_g += ml * density
q, u = display_mass(total_g)
lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc))
# CASE 3: count + (mass OR volume) → use common_size_g to convert count
elif have_cnt and (have_mass or have_vol):
common_size = float(meta.get("common_size_g") or 0)
if common_size and (not have_vol or density):
total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"])
for ing, qty in buckets["volume"]:
ml = to_ml(qty, ing.unit) or 0
total_g += ml * (density or 1.0)
for _, qty in buckets["count"]:
total_g += qty * common_size
q, u = display_mass(total_g)
lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc))
else:
# Can't convert count cleanly — split into separate lines
lines.extend(_split_lines(food, buckets, contribs, notes_acc))
# CASE 4: anything else (mass + volume but no density / mixed unknown)
else:
lines.extend(_split_lines(food, buckets, contribs, notes_acc))
# Vague-only ingredients always tag onto the food's notes (or stand alone)
if have_vague and not lines:
lines.append(ShoppingLine(food=food, qty=None, unit="to taste",
contributors=contribs, notes=notes_acc + ["to taste"]))
elif have_vague and lines:
lines[0].notes.append("plus to-taste")
# Unknown unit → include verbatim
for ing, qty in buckets["unknown"]:
lines.append(ShoppingLine(
food=food, qty=qty, unit=ing.unit or "?",
contributors=[ing.original_text or _render(ing)], notes=[],
is_split=True,
))
# qty=None safety net: if every contributor was a no-qty ingredient
# (Mealie's parser couldn't extract a number), nothing else above
# produced a line. Emit a placeholder so the food APPEARS on the
# shopping list — Abby still needs to know to buy onions even if the
# recipe just said "1 onion, chopped". UI surfaces this as a
# "qty unspecified" hint, nudging Cobb to run sterilize.
if no_qty_items and not lines:
lines.append(ShoppingLine(
food=food, qty=None, unit="ea",
contributors=contribs,
notes=notes_acc + ["quantity unspecified — re-sterilize for an exact total"],
))
elif no_qty_items and lines:
# We DID emit a sized line — still flag that some contributors
# had unknown qty so the user knows the total may be incomplete.
lines[0].notes.append(
f"+ {len(no_qty_items)} ingredient(s) with no quantity"
)
return lines
def _split_lines(food, buckets, contribs, notes_acc) -> list[ShoppingLine]:
"""Fall-back: emit one shopping line per non-empty unit class."""
out = []
if any(qty for _, qty in buckets["mass"]):
total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"])
q, u = display_mass(total_g)
out.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc, is_split=True))
if any(qty for _, qty in buckets["volume"]):
total_ml = sum(to_ml(qty, ing.unit) or 0 for ing, qty in buckets["volume"])
q, u = display_volume(total_ml)
out.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc, is_split=True))
if any(qty for _, qty in buckets["count"]):
total = sum(qty for _, qty in buckets["count"])
unit = buckets["count"][0][0].unit or "ea"
out.append(ShoppingLine(food=food, qty=total, unit=unit, contributors=contribs, notes=notes_acc, is_split=True))
return out
def _render(ing: Ingredient) -> str:
parts = []
if ing.qty is not None:
parts.append(str(ing.qty))
if ing.unit:
parts.append(ing.unit)
parts.append(ing.food_name)
if ing.note:
parts.append(f"({ing.note})")
return " ".join(parts)