"""Unit-aware shopping list aggregator. Cobb's killer feature: take ingredients from N recipes, return a single consolidated shopping list with per-food totals. Examples: In: [(2, "cup", "rice"), (1.25, "lb", "rice"), (3, "tbsp", "olive oil")] Out: [("rice", 947, "g"), ("olive oil", 42, "ml")] Mixed mass+volume aggregation uses density from cauldron_foods. If density is unknown or units don't reconcile (count + mass), we split into separate shopping-list lines but group them under one heading. The aggregator is a pure module — no DB or HTTP. Inject a foods_lookup callable: foods_lookup(name) -> {canonical_name, density_g_per_ml, default_unit_class, common_size_g} or None. """ from collections import defaultdict from dataclasses import dataclass, field from decimal import Decimal from typing import Callable, Iterable # ---------- unit tables ---------------------------------------------------- # All conversions normalize to base SI: ml for volume, g for mass. VOLUME_TO_ML = { "ml": 1.0, "milliliter": 1.0, "milliliters": 1.0, "l": 1000.0, "liter": 1000.0, "liters": 1000.0, "tsp": 4.92892, "teaspoon": 4.92892, "teaspoons": 4.92892, "tbsp": 14.7868, "tablespoon": 14.7868, "tablespoons": 14.7868, "fl oz": 29.5735, "fluid ounce": 29.5735, "fluid ounces": 29.5735, "cup": 236.588, "cups": 236.588, "pint": 473.176, "pints": 473.176, "quart": 946.353, "quarts": 946.353, "gallon": 3785.41, "gallons": 3785.41, } MASS_TO_G = { "g": 1.0, "gram": 1.0, "grams": 1.0, "kg": 1000.0, "kilogram": 1000.0, "kilograms": 1000.0, "mg": 0.001, "milligram": 0.001, "milligrams": 0.001, "oz": 28.3495, "ounce": 28.3495, "ounces": 28.3495, "lb": 453.592, "lbs": 453.592, "pound": 453.592, "pounds": 453.592, } # Count-style units. Their qty IS the count; common_size_g resolves to mass. COUNT_UNITS = { "", "each", "ea", "piece", "pieces", "whole", "clove", "cloves", "slice", "slices", "leaf", "leaves", "head", "heads", "bunch", "bunches", "sprig", "sprigs", "stalk", "stalks", "ear", "ears", "can", "cans", "package", "packages", "pkg", "packet", "packets", "bottle", "bottles", "jar", "jars", "box", "boxes", "bag", "bags", } VAGUE_UNITS = { "pinch", "pinches", "dash", "dashes", "handful", "handfuls", "to taste", "as needed", "splash", "drizzle", } def classify_unit(unit: str | None) -> str: u = (unit or "").strip().lower() if u in VOLUME_TO_ML: return "volume" if u in MASS_TO_G: return "mass" if u in VAGUE_UNITS: return "vague" if u in COUNT_UNITS: return "count" return "unknown" def to_ml(qty: float, unit: str) -> float | None: f = VOLUME_TO_ML.get((unit or "").strip().lower()) return qty * f if f is not None else None def to_g(qty: float, unit: str) -> float | None: f = MASS_TO_G.get((unit or "").strip().lower()) return qty * f if f is not None else None def display_mass(g: float) -> tuple[float, str]: """Pick a store-friendly mass display for a quantity in grams.""" if g < 30: return (round(g, 1), "g") if g < 500: return (round(g / 28.3495 * 2) / 2, "oz") # nearest 0.5 oz if g < 2000: return (round(g / 453.592 * 4) / 4, "lb") # nearest 0.25 lb return (round(g / 453.592, 1), "lb") def display_volume(ml: float) -> tuple[float, str]: """Pick a store-friendly volume display for a quantity in ml.""" if ml < 30: return (round(ml / 4.92892, 1), "tsp") if ml < 250: return (round(ml / 14.7868, 1), "tbsp") if ml < 1000: return (round(ml / 236.588, 2), "cup") if ml < 4000: return (round(ml / 946.353, 2), "qt") return (round(ml / 3785.41, 1), "gal") # ---------- model ---------------------------------------------------------- @dataclass class Ingredient: """One line on a recipe — what we feed in.""" qty: float | None unit: str | None food_name: str # raw food name (used for display + Sonnet fallback) mealie_food_id: str | None = None # Mealie's UUID; primary grouping key when present note: str | None = None source_recipe_slug: str | None = None original_text: str | None = None @dataclass class ShoppingLine: """One line on the consolidated shopping list — what we return.""" food: str qty: float | None unit: str contributors: list[str] = field(default_factory=list) # original ingredient texts that fed this line notes: list[str] = field(default_factory=list) # collected notes ("chopped", "minced") is_split: bool = False # True if this is one line of a split (e.g. count + mass for same food) # ---------- core aggregation ----------------------------------------------- def aggregate( ingredients: Iterable[Ingredient], foods_lookup: Callable[[str, str | None], dict | None], ) -> list[ShoppingLine]: """Group ingredients by Mealie food.id (when available) and consolidate quantities. Output is one shopping-list line per food, or N lines per food when units don't reconcile. foods_lookup(food_name, mealie_food_id) returns metadata: {canonical_name, density_g_per_ml, default_unit_class, common_size_g} or None for foods we have no record of. The id-keyed lookup means "rice" in 3 different recipes always groups under one canonical line as long as Mealie has them all linked to the same food row. """ # Step 1: bucket by stable key. Prefer Mealie food.id when present # (guaranteed consistent across recipes for the same food). Fall # back to a normalized name when the ingredient hasn't been linked # to a Mealie food row. by_food: dict[str, list[Ingredient]] = defaultdict(list) food_meta: dict[str, dict] = {} for ing in ingredients: if not ing.food_name and not ing.mealie_food_id: continue # Lookup metadata. Both args passed; lookup decides which is # primary (id-first when set; name as fallback for Sonnet calls). meta = foods_lookup(ing.food_name, ing.mealie_food_id) or { "canonical_name": (ing.food_name or "").strip().lower() or "(unknown)" } # Stable grouping key: id when we have it, normalized name otherwise. key = ing.mealie_food_id or meta.get("canonical_name") or ing.food_name.strip().lower() # Display name: prefer canonical_name from metadata, else the # Mealie food.name we received. canonical_display = meta.get("canonical_name") or (ing.food_name or "").strip().lower() # Stash the display once per group if key not in food_meta: food_meta[key] = {**meta, "canonical_name": canonical_display} by_food[key].append(ing) out: list[ShoppingLine] = [] for key, group in by_food.items(): meta = food_meta[key] out.extend(_aggregate_one_food(meta["canonical_name"], group, meta)) return out def _aggregate_one_food( food: str, items: list[Ingredient], meta: dict, ) -> list[ShoppingLine]: """All ingredients for ONE food → 1+ ShoppingLines.""" # Bucket by unit class buckets: dict[str, list[tuple[Ingredient, float]]] = { "mass": [], "volume": [], "count": [], "vague": [], "unknown": [], } for ing in items: cls = classify_unit(ing.unit) buckets[cls].append((ing, ing.qty if ing.qty is not None else 0.0)) lines: list[ShoppingLine] = [] notes_acc = sorted({i.note.strip() for i in items if i.note and i.note.strip()}) contribs = [ i.original_text or _render(i) for i in items if (i.original_text or i.qty is not None or i.note) ] density = float(meta.get("density_g_per_ml") or 0) or None have_mass = any(qty for _, qty in buckets["mass"]) have_vol = any(qty for _, qty in buckets["volume"]) have_cnt = any(qty for _, qty in buckets["count"]) have_unk = bool(buckets["unknown"]) have_vague = bool(buckets["vague"]) # CASE 1: ONLY one of mass / volume / count present → easy sum classes_present = sum([have_mass, have_vol, have_cnt]) if classes_present == 1 and not have_unk: if have_mass: total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"]) q, u = display_mass(total_g) lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc)) elif have_vol: total_ml = sum(to_ml(qty, ing.unit) or 0 for ing, qty in buckets["volume"]) q, u = display_volume(total_ml) lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc)) elif have_cnt: total = sum(qty for _, qty in buckets["count"]) unit = buckets["count"][0][0].unit or "ea" lines.append(ShoppingLine(food=food, qty=total, unit=unit, contributors=contribs, notes=notes_acc)) # CASE 2: mass + volume (the killer case) → use density if known elif have_mass and have_vol and not have_cnt and density: total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"]) for ing, qty in buckets["volume"]: ml = to_ml(qty, ing.unit) or 0 total_g += ml * density q, u = display_mass(total_g) lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc)) # CASE 3: count + (mass OR volume) → use common_size_g to convert count elif have_cnt and (have_mass or have_vol): common_size = float(meta.get("common_size_g") or 0) if common_size and (not have_vol or density): total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"]) for ing, qty in buckets["volume"]: ml = to_ml(qty, ing.unit) or 0 total_g += ml * (density or 1.0) for _, qty in buckets["count"]: total_g += qty * common_size q, u = display_mass(total_g) lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc)) else: # Can't convert count cleanly — split into separate lines lines.extend(_split_lines(food, buckets, contribs, notes_acc)) # CASE 4: anything else (mass + volume but no density / mixed unknown) else: lines.extend(_split_lines(food, buckets, contribs, notes_acc)) # Vague-only ingredients always tag onto the food's notes (or stand alone) if have_vague and not lines: lines.append(ShoppingLine(food=food, qty=None, unit="to taste", contributors=contribs, notes=notes_acc + ["to taste"])) elif have_vague and lines: lines[0].notes.append("plus to-taste") # Unknown unit → include verbatim for ing, qty in buckets["unknown"]: lines.append(ShoppingLine( food=food, qty=qty, unit=ing.unit or "?", contributors=[ing.original_text or _render(ing)], notes=[], is_split=True, )) return lines def _split_lines(food, buckets, contribs, notes_acc) -> list[ShoppingLine]: """Fall-back: emit one shopping line per non-empty unit class.""" out = [] if any(qty for _, qty in buckets["mass"]): total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"]) q, u = display_mass(total_g) out.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc, is_split=True)) if any(qty for _, qty in buckets["volume"]): total_ml = sum(to_ml(qty, ing.unit) or 0 for ing, qty in buckets["volume"]) q, u = display_volume(total_ml) out.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc, is_split=True)) if any(qty for _, qty in buckets["count"]): total = sum(qty for _, qty in buckets["count"]) unit = buckets["count"][0][0].unit or "ea" out.append(ShoppingLine(food=food, qty=total, unit=unit, contributors=contribs, notes=notes_acc, is_split=True)) return out def _render(ing: Ingredient) -> str: parts = [] if ing.qty is not None: parts.append(str(ing.qty)) if ing.unit: parts.append(ing.unit) parts.append(ing.food_name) if ing.note: parts.append(f"({ing.note})") return " ".join(parts)