Pure-Python module + 14 unit tests proving the centerpiece works:
test_rice_mixed:
in: [(2 cup, rice), (1.25 lb, rice)]
out: 2.25 lb rice (one line, properly mass+volume combined via density)
test_butter_mixed:
in: [(0.5 cup, butter), (4 oz, butter)]
out: ~227g butter (~8oz / 0.5 lb)
test_three_recipes:
feeds 9 ingredients across 3 recipes through the aggregator;
rice (cup + lb) collapses, garlic (cloves) sums, eggs count, salt as 'pinch'
bucketed as to-taste. All on one shopping list.
Algorithm in cauldron/aggregator.py:
1. Bucket ingredients by canonical food (foods_lookup callable injected — no DB coupling)
2. Within each food, classify each unit (mass / volume / count / vague / unknown)
3. CASE 1: only one unit class present → simple sum, display in canonical store-friendly unit
4. CASE 2: mass + volume (the killer) → use density_g_per_ml to combine to grams
5. CASE 3: count + (mass | volume) → use common_size_g to convert count to grams
6. CASE 4: anything that can't reconcile (no density, mixed unknown) → split into 1 line per class with is_split=True
7. vague (pinch, dash, to taste) → annotate as 'plus to-taste'
8. unknown units → emit verbatim with the original text
Display: store-friendly unit picker:
<30g → grams
<500g → ounces (nearest 0.5)
<2kg → pounds (nearest 0.25)
>2kg → big pounds
The aggregator is dependency-injection-friendly — foods_lookup(name) is
the only external call. Tests pass a stub dict; production will pass
foods.search_food(db, name). Decouples math from data quality.
Tests run via:
python3 -m unittest discover -s tests -v
288 lines
11 KiB
Python
288 lines
11 KiB
Python
"""Unit-aware shopping list aggregator.
|
|
|
|
Cobb's killer feature: take ingredients from N recipes, return a single
|
|
consolidated shopping list with per-food totals.
|
|
|
|
Examples:
|
|
In: [(2, "cup", "rice"), (1.25, "lb", "rice"), (3, "tbsp", "olive oil")]
|
|
Out: [("rice", 947, "g"), ("olive oil", 42, "ml")]
|
|
|
|
Mixed mass+volume aggregation uses density from cauldron_foods. If density
|
|
is unknown or units don't reconcile (count + mass), we split into separate
|
|
shopping-list lines but group them under one heading.
|
|
|
|
The aggregator is a pure module — no DB or HTTP. Inject a foods_lookup
|
|
callable: foods_lookup(name) -> {canonical_name, density_g_per_ml,
|
|
default_unit_class, common_size_g} or None.
|
|
"""
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, field
|
|
from decimal import Decimal
|
|
from typing import Callable, Iterable
|
|
|
|
|
|
# ---------- unit tables ----------------------------------------------------
|
|
|
|
# All conversions normalize to base SI: ml for volume, g for mass.
|
|
VOLUME_TO_ML = {
|
|
"ml": 1.0, "milliliter": 1.0, "milliliters": 1.0,
|
|
"l": 1000.0, "liter": 1000.0, "liters": 1000.0,
|
|
"tsp": 4.92892, "teaspoon": 4.92892, "teaspoons": 4.92892,
|
|
"tbsp": 14.7868, "tablespoon": 14.7868, "tablespoons": 14.7868,
|
|
"fl oz": 29.5735, "fluid ounce": 29.5735, "fluid ounces": 29.5735,
|
|
"cup": 236.588, "cups": 236.588,
|
|
"pint": 473.176, "pints": 473.176,
|
|
"quart": 946.353, "quarts": 946.353,
|
|
"gallon": 3785.41, "gallons": 3785.41,
|
|
}
|
|
|
|
MASS_TO_G = {
|
|
"g": 1.0, "gram": 1.0, "grams": 1.0,
|
|
"kg": 1000.0, "kilogram": 1000.0, "kilograms": 1000.0,
|
|
"mg": 0.001, "milligram": 0.001, "milligrams": 0.001,
|
|
"oz": 28.3495, "ounce": 28.3495, "ounces": 28.3495,
|
|
"lb": 453.592, "lbs": 453.592, "pound": 453.592, "pounds": 453.592,
|
|
}
|
|
|
|
# Count-style units. Their qty IS the count; common_size_g resolves to mass.
|
|
COUNT_UNITS = {
|
|
"", "each", "ea", "piece", "pieces", "whole",
|
|
"clove", "cloves", "slice", "slices", "leaf", "leaves",
|
|
"head", "heads", "bunch", "bunches", "sprig", "sprigs",
|
|
"stalk", "stalks", "ear", "ears",
|
|
"can", "cans", "package", "packages", "pkg", "packet", "packets",
|
|
"bottle", "bottles", "jar", "jars", "box", "boxes", "bag", "bags",
|
|
}
|
|
|
|
VAGUE_UNITS = {
|
|
"pinch", "pinches", "dash", "dashes", "handful", "handfuls",
|
|
"to taste", "as needed", "splash", "drizzle",
|
|
}
|
|
|
|
|
|
def classify_unit(unit: str | None) -> str:
|
|
u = (unit or "").strip().lower()
|
|
if u in VOLUME_TO_ML:
|
|
return "volume"
|
|
if u in MASS_TO_G:
|
|
return "mass"
|
|
if u in VAGUE_UNITS:
|
|
return "vague"
|
|
if u in COUNT_UNITS:
|
|
return "count"
|
|
return "unknown"
|
|
|
|
|
|
def to_ml(qty: float, unit: str) -> float | None:
|
|
f = VOLUME_TO_ML.get((unit or "").strip().lower())
|
|
return qty * f if f is not None else None
|
|
|
|
|
|
def to_g(qty: float, unit: str) -> float | None:
|
|
f = MASS_TO_G.get((unit or "").strip().lower())
|
|
return qty * f if f is not None else None
|
|
|
|
|
|
def display_mass(g: float) -> tuple[float, str]:
|
|
"""Pick a store-friendly mass display for a quantity in grams."""
|
|
if g < 30:
|
|
return (round(g, 1), "g")
|
|
if g < 500:
|
|
return (round(g / 28.3495 * 2) / 2, "oz") # nearest 0.5 oz
|
|
if g < 2000:
|
|
return (round(g / 453.592 * 4) / 4, "lb") # nearest 0.25 lb
|
|
return (round(g / 453.592, 1), "lb")
|
|
|
|
|
|
def display_volume(ml: float) -> tuple[float, str]:
|
|
"""Pick a store-friendly volume display for a quantity in ml."""
|
|
if ml < 30:
|
|
return (round(ml / 4.92892, 1), "tsp")
|
|
if ml < 250:
|
|
return (round(ml / 14.7868, 1), "tbsp")
|
|
if ml < 1000:
|
|
return (round(ml / 236.588, 2), "cup")
|
|
if ml < 4000:
|
|
return (round(ml / 946.353, 2), "qt")
|
|
return (round(ml / 3785.41, 1), "gal")
|
|
|
|
|
|
# ---------- model ----------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class Ingredient:
|
|
"""One line on a recipe — what we feed in."""
|
|
qty: float | None
|
|
unit: str | None
|
|
food_name: str # raw food name (will be canonicalized via lookup)
|
|
note: str | None = None
|
|
source_recipe_slug: str | None = None
|
|
original_text: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class ShoppingLine:
|
|
"""One line on the consolidated shopping list — what we return."""
|
|
food: str
|
|
qty: float | None
|
|
unit: str
|
|
contributors: list[str] = field(default_factory=list) # original ingredient texts that fed this line
|
|
notes: list[str] = field(default_factory=list) # collected notes ("chopped", "minced")
|
|
is_split: bool = False # True if this is one line of a split (e.g. count + mass for same food)
|
|
|
|
|
|
# ---------- core aggregation -----------------------------------------------
|
|
|
|
|
|
def aggregate(
|
|
ingredients: Iterable[Ingredient],
|
|
foods_lookup: Callable[[str], dict | None],
|
|
) -> list[ShoppingLine]:
|
|
"""Group ingredients by canonical food, sum within compatible unit
|
|
classes, output a clean shopping-list line per food (or per unit-class
|
|
if we can't reconcile).
|
|
|
|
foods_lookup(name) returns {canonical_name, density_g_per_ml,
|
|
default_unit_class, common_size_g} or None for unknown foods.
|
|
"""
|
|
# Step 1: bucket by canonical food
|
|
by_food: dict[str, list[Ingredient]] = defaultdict(list)
|
|
food_meta: dict[str, dict] = {}
|
|
for ing in ingredients:
|
|
if not ing.food_name:
|
|
continue
|
|
meta = foods_lookup(ing.food_name) or {"canonical_name": ing.food_name.strip().lower()}
|
|
canonical = meta["canonical_name"]
|
|
by_food[canonical].append(ing)
|
|
food_meta[canonical] = meta
|
|
|
|
out: list[ShoppingLine] = []
|
|
for food, group in by_food.items():
|
|
meta = food_meta[food]
|
|
out.extend(_aggregate_one_food(food, group, meta))
|
|
return out
|
|
|
|
|
|
def _aggregate_one_food(
|
|
food: str,
|
|
items: list[Ingredient],
|
|
meta: dict,
|
|
) -> list[ShoppingLine]:
|
|
"""All ingredients for ONE food → 1+ ShoppingLines."""
|
|
# Bucket by unit class
|
|
buckets: dict[str, list[tuple[Ingredient, float]]] = {
|
|
"mass": [], "volume": [], "count": [], "vague": [], "unknown": [],
|
|
}
|
|
for ing in items:
|
|
cls = classify_unit(ing.unit)
|
|
buckets[cls].append((ing, ing.qty if ing.qty is not None else 0.0))
|
|
|
|
lines: list[ShoppingLine] = []
|
|
notes_acc = sorted({i.note.strip() for i in items if i.note and i.note.strip()})
|
|
contribs = [
|
|
i.original_text or _render(i)
|
|
for i in items
|
|
if (i.original_text or i.qty is not None or i.note)
|
|
]
|
|
|
|
density = float(meta.get("density_g_per_ml") or 0) or None
|
|
|
|
have_mass = any(qty for _, qty in buckets["mass"])
|
|
have_vol = any(qty for _, qty in buckets["volume"])
|
|
have_cnt = any(qty for _, qty in buckets["count"])
|
|
have_unk = bool(buckets["unknown"])
|
|
have_vague = bool(buckets["vague"])
|
|
|
|
# CASE 1: ONLY one of mass / volume / count present → easy sum
|
|
classes_present = sum([have_mass, have_vol, have_cnt])
|
|
|
|
if classes_present == 1 and not have_unk:
|
|
if have_mass:
|
|
total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"])
|
|
q, u = display_mass(total_g)
|
|
lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc))
|
|
elif have_vol:
|
|
total_ml = sum(to_ml(qty, ing.unit) or 0 for ing, qty in buckets["volume"])
|
|
q, u = display_volume(total_ml)
|
|
lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc))
|
|
elif have_cnt:
|
|
total = sum(qty for _, qty in buckets["count"])
|
|
unit = buckets["count"][0][0].unit or "ea"
|
|
lines.append(ShoppingLine(food=food, qty=total, unit=unit, contributors=contribs, notes=notes_acc))
|
|
|
|
# CASE 2: mass + volume (the killer case) → use density if known
|
|
elif have_mass and have_vol and not have_cnt and density:
|
|
total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"])
|
|
for ing, qty in buckets["volume"]:
|
|
ml = to_ml(qty, ing.unit) or 0
|
|
total_g += ml * density
|
|
q, u = display_mass(total_g)
|
|
lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc))
|
|
|
|
# CASE 3: count + (mass OR volume) → use common_size_g to convert count
|
|
elif have_cnt and (have_mass or have_vol):
|
|
common_size = float(meta.get("common_size_g") or 0)
|
|
if common_size and (not have_vol or density):
|
|
total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"])
|
|
for ing, qty in buckets["volume"]:
|
|
ml = to_ml(qty, ing.unit) or 0
|
|
total_g += ml * (density or 1.0)
|
|
for _, qty in buckets["count"]:
|
|
total_g += qty * common_size
|
|
q, u = display_mass(total_g)
|
|
lines.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc))
|
|
else:
|
|
# Can't convert count cleanly — split into separate lines
|
|
lines.extend(_split_lines(food, buckets, contribs, notes_acc))
|
|
|
|
# CASE 4: anything else (mass + volume but no density / mixed unknown)
|
|
else:
|
|
lines.extend(_split_lines(food, buckets, contribs, notes_acc))
|
|
|
|
# Vague-only ingredients always tag onto the food's notes (or stand alone)
|
|
if have_vague and not lines:
|
|
lines.append(ShoppingLine(food=food, qty=None, unit="to taste",
|
|
contributors=contribs, notes=notes_acc + ["to taste"]))
|
|
elif have_vague and lines:
|
|
lines[0].notes.append("plus to-taste")
|
|
|
|
# Unknown unit → include verbatim
|
|
for ing, qty in buckets["unknown"]:
|
|
lines.append(ShoppingLine(
|
|
food=food, qty=qty, unit=ing.unit or "?",
|
|
contributors=[ing.original_text or _render(ing)], notes=[],
|
|
is_split=True,
|
|
))
|
|
|
|
return lines
|
|
|
|
|
|
def _split_lines(food, buckets, contribs, notes_acc) -> list[ShoppingLine]:
|
|
"""Fall-back: emit one shopping line per non-empty unit class."""
|
|
out = []
|
|
if any(qty for _, qty in buckets["mass"]):
|
|
total_g = sum(to_g(qty, ing.unit) or 0 for ing, qty in buckets["mass"])
|
|
q, u = display_mass(total_g)
|
|
out.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc, is_split=True))
|
|
if any(qty for _, qty in buckets["volume"]):
|
|
total_ml = sum(to_ml(qty, ing.unit) or 0 for ing, qty in buckets["volume"])
|
|
q, u = display_volume(total_ml)
|
|
out.append(ShoppingLine(food=food, qty=q, unit=u, contributors=contribs, notes=notes_acc, is_split=True))
|
|
if any(qty for _, qty in buckets["count"]):
|
|
total = sum(qty for _, qty in buckets["count"])
|
|
unit = buckets["count"][0][0].unit or "ea"
|
|
out.append(ShoppingLine(food=food, qty=total, unit=unit, contributors=contribs, notes=notes_acc, is_split=True))
|
|
return out
|
|
|
|
|
|
def _render(ing: Ingredient) -> str:
|
|
parts = []
|
|
if ing.qty is not None:
|
|
parts.append(str(ing.qty))
|
|
if ing.unit:
|
|
parts.append(ing.unit)
|
|
parts.append(ing.food_name)
|
|
if ing.note:
|
|
parts.append(f"({ing.note})")
|
|
return " ".join(parts)
|