diff --git a/.env.example b/.env.example
index e813cee..e2318b9 100644
--- a/.env.example
+++ b/.env.example
@@ -31,3 +31,14 @@ CRAFTING_DEFAULT_JOB_TIMEOUT=1800
# Workspace gc — how often to sweep for stale worktrees, and the age cutoff.
CRAFTING_GC_INTERVAL=3600
CRAFTING_GC_AGE=86400
+
+# --- Email digest (optional, off by default) -------------------------------
+# If CRAFTING_SMTP_HOST is empty, the digest scheduler stays disabled and
+# the server logs a "digest disabled" warning at startup. Setting it on
+# enables the daily 06:00 PT loop.
+# CRAFTING_SMTP_HOST=postfix.sulkta.com
+# CRAFTING_SMTP_PORT=587
+# CRAFTING_SMTP_USER=crafting-table@sulkta.com
+# CRAFTING_SMTP_PASS=
+# CRAFTING_SMTP_FROM=crafting-table@sulkta.com
+# CRAFTING_SMTP_TLS=1
diff --git a/README.md b/README.md
index 34c6aaa..733e761 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,7 @@ Spec: `Sulkta-Coop/openclaw-workspace/memory/spec-crafting-table.md` (LAN-only).
- [ ] Step 5: Per-language parsers (Rust / Python / Go / TS first)
- [ ] Step 6: Findings extraction + storage
- [ ] Step 7: MCP server (stdio JSON-RPC, 8 tools)
-- [ ] Step 8: Email digest scheduler
+- [x] Step 8: Email digest scheduler
- [ ] Step 9: Autonomous patch loop (clawdforge integration)
- [ ] Step 10: Production recipes — clawdforge, cauldron, tradecraft
@@ -222,6 +222,56 @@ pip install -e '.[test]'
pytest tests/
```
+## Digest
+
+Daily 06:00 PT email digest. One message per project per day; aggregates the
+last 24h of jobs per recipient and sends via SMTP relay (Lucy postfix).
+
+Set the SMTP block in `.env` to enable — leaving `CRAFTING_SMTP_HOST` unset
+keeps the scheduler off and logs `digest disabled — CRAFTING_SMTP_HOST not set`
+at startup. The `/digests` and `/admin/digest/run-now` endpoints still work
+in dry-run mode regardless.
+
+```bash
+CRAFTING_SMTP_HOST=postfix.sulkta.com
+CRAFTING_SMTP_PORT=587
+CRAFTING_SMTP_USER=crafting-table@sulkta.com
+CRAFTING_SMTP_PASS=...
+CRAFTING_SMTP_FROM=crafting-table@sulkta.com
+CRAFTING_SMTP_TLS=1
+```
+
+Each project's `notify.email` + `notify.on` fields control delivery:
+
+| `notify.on` event | When it fires |
+|---------------------|--------------------------------------------------------|
+| `audit_pass` | a passing audit job |
+| `audit_fail` | a failing audit job |
+| `test_fail` | a failing test job |
+| `lint_warn` | a lint job with warning-severity findings |
+| `cve_found` | any job whose findings include a `cve` |
+| `patch_drafted` | (wave 3 / step 9) auto-patch was drafted |
+| `nightly_summary` | catch-all — show ALL jobs in the project's section |
+
+Empty `notify.on` defaults to `["audit_fail", "cve_found", "patch_drafted"]`.
+Empty `notify.email` silently excludes the project.
+
+Manual trigger from the LAN admin token:
+
+```bash
+# Render today's digest without sending
+curl -sH "Authorization: Bearer $ADMIN" \
+ -X POST http://192.168.0.5:8810/admin/digest/run-now \
+ -d '{"dry_run": true}' | jq .
+
+# Render an arbitrary date as JSON
+curl -sH "Authorization: Bearer $ADMIN" \
+ http://192.168.0.5:8810/digests/2026-04-29 | jq .
+```
+
+Idempotency: `digest_runs` table holds `UNIQUE(date, project_name)`, so the
+06:00 loop is safe to re-fire on the same day — only the first call sends.
+
## License
MIT
diff --git a/crafting_table/db.py b/crafting_table/db.py
index 8676155..fb1bd4d 100644
--- a/crafting_table/db.py
+++ b/crafting_table/db.py
@@ -112,6 +112,21 @@ MIGRATIONS: list[tuple[str, str]] = [
CREATE INDEX IF NOT EXISTS idx_findings_fingerprint ON findings(fingerprint);
""",
),
+ (
+ "006_digest_runs",
+ """
+ CREATE TABLE IF NOT EXISTS digest_runs (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ date TEXT NOT NULL,
+ project_name TEXT NOT NULL,
+ sent_at INTEGER NOT NULL,
+ recipient_count INTEGER NOT NULL,
+ job_count INTEGER NOT NULL,
+ UNIQUE(date, project_name)
+ );
+ CREATE INDEX IF NOT EXISTS idx_digest_runs_date ON digest_runs(date);
+ """,
+ ),
]
# fmt: on
@@ -489,6 +504,52 @@ class DB:
).fetchall()
return [dict(r) for r in rows]
+ # ---------- digest runs --------------------------------------------------
+
+ def record_digest_run(
+ self,
+ *,
+ date: str,
+ project_name: str,
+ sent_at: int,
+ recipient_count: int,
+ job_count: int,
+ ) -> bool:
+ """Record an attempted/successful digest send. UNIQUE(date, project_name)
+ enforces idempotency — a second call for the same date+project is a
+ no-op (returns False)."""
+ with self._conn() as c:
+ cur = c.execute(
+ """
+ INSERT OR IGNORE INTO digest_runs
+ (date, project_name, sent_at, recipient_count, job_count)
+ VALUES (?, ?, ?, ?, ?)
+ """,
+ (date, project_name, sent_at, recipient_count, job_count),
+ )
+ return cur.rowcount == 1
+
+ def digest_run_exists(self, date: str, project_name: str) -> bool:
+ with self._conn() as c:
+ row = c.execute(
+ "SELECT 1 FROM digest_runs WHERE date=? AND project_name=?",
+ (date, project_name),
+ ).fetchone()
+ return row is not None
+
+ def list_digest_runs(self, date: str | None = None) -> list[dict]:
+ with self._conn() as c:
+ if date is None:
+ rows = c.execute(
+ "SELECT * FROM digest_runs ORDER BY date DESC, project_name"
+ ).fetchall()
+ else:
+ rows = c.execute(
+ "SELECT * FROM digest_runs WHERE date=? ORDER BY project_name",
+ (date,),
+ ).fetchall()
+ return [dict(r) for r in rows]
+
# ---------- async wrappers ----------------------------------------------
async def arun(self, fn, *args, **kwargs):
diff --git a/crafting_table/digest.py b/crafting_table/digest.py
new file mode 100644
index 0000000..7317288
--- /dev/null
+++ b/crafting_table/digest.py
@@ -0,0 +1,551 @@
+"""Email digest scheduler.
+
+Aggregates the last 24h of jobs per project, builds a text + HTML email body,
+and sends it via SMTP. Runs daily at 06:00 PT (configurable). Each project's
+`notify.on` event filter controls which job outcomes show up in its digest.
+
+Design notes:
+- We use a sleep-until-next-window asyncio loop instead of cron — keeps the
+ scheduler in-process so we can dry-run via the API for testing.
+- Idempotency is enforced by a `digest_runs` table with UNIQUE(date, project_name).
+ Calling run_once twice for the same date will only send one email per project.
+- SMTP send is done via stdlib smtplib (sync) wrapped in run_in_executor so
+ we don't block the loop while postfix grumbles.
+- If SMTP isn't configured, the server lifespan logs a warning and skips
+ scheduler startup. The /digests endpoints still work for dry-run rendering.
+- Patch-drafted / auto-patches / bugs.sulkta.com numbers are zero-state
+ placeholders for v0.1 wave 2C — wave 3 / step 9 wires them.
+"""
+from __future__ import annotations
+
+import asyncio
+import json
+import logging
+import os
+import smtplib
+import time
+from dataclasses import dataclass, field
+from datetime import datetime, timedelta
+from email.message import EmailMessage
+from pathlib import Path
+from typing import Iterable
+from zoneinfo import ZoneInfo
+
+from .db import DB
+
+
+log = logging.getLogger("crafting_table.digest")
+
+
+# Recognized notify.on event tags. Documented for callers; the digest filter
+# consults this to map job outcomes to events.
+VALID_NOTIFY_EVENTS = (
+ "audit_pass",
+ "audit_fail",
+ "test_fail",
+ "lint_warn",
+ "cve_found",
+ "patch_drafted",
+ "nightly_summary",
+)
+
+# Default events when a project has notify.email but empty notify.on.
+DEFAULT_NOTIFY_ON = ("audit_fail", "cve_found", "patch_drafted")
+
+
+@dataclass(frozen=True)
+class SmtpConfig:
+ host: str
+ port: int = 587
+ username: str | None = None
+ password: str | None = None
+ from_addr: str = "crafting-table@localhost"
+ use_tls: bool = True
+
+ @classmethod
+ def from_env(cls) -> "SmtpConfig | None":
+ """Read CRAFTING_SMTP_* env vars. Returns None if CRAFTING_SMTP_HOST
+ is missing — caller treats that as "digest disabled"."""
+ host = os.environ.get("CRAFTING_SMTP_HOST", "").strip()
+ if not host:
+ return None
+ return cls(
+ host=host,
+ port=int(os.environ.get("CRAFTING_SMTP_PORT", "587")),
+ username=os.environ.get("CRAFTING_SMTP_USER") or None,
+ password=os.environ.get("CRAFTING_SMTP_PASS") or None,
+ from_addr=os.environ.get("CRAFTING_SMTP_FROM", "crafting-table@localhost"),
+ use_tls=os.environ.get("CRAFTING_SMTP_TLS", "1").strip() not in ("0", "false", "no", ""),
+ )
+
+
+# --- helpers ----------------------------------------------------------------
+
+
+def _job_event_tags(job: dict, findings: list[dict]) -> set[str]:
+ """Map a job + its findings to notify.on event tags.
+
+ Mirrors the spec's vocabulary:
+ - audit/test/lint kind + status -> {recipe}_pass / {recipe}_fail / lint_warn
+ - any finding with kind=='cve' -> cve_found
+ - patch_drafted is reserved for wave 3 / step 9 (unused here)
+ """
+ tags: set[str] = set()
+ recipe = job["recipe"]
+ status = job["status"]
+ if status == "succeeded":
+ tags.add(f"{recipe}_pass")
+ elif status in ("failed", "timed_out"):
+ tags.add(f"{recipe}_fail")
+
+ has_warn = any(f.get("severity") == "warn" for f in findings)
+ if recipe == "lint" and has_warn:
+ tags.add("lint_warn")
+
+ if any(f.get("kind") == "cve" for f in findings):
+ tags.add("cve_found")
+
+ return tags
+
+
+def _outcome_glyph(job: dict, findings: list[dict]) -> str:
+ """Choose the leading char per run line. Matches spec's worked example:
+ ✓ pass, ✗ fail/timed_out, ⚠ pass-with-warnings (lint warn / non-cve warn).
+ """
+ if job["status"] in ("failed", "timed_out", "cancelled"):
+ return "✗" # ✗
+ has_warn = any(f.get("severity") in ("warn", "warning") for f in findings)
+ if has_warn:
+ return "⚠" # ⚠
+ return "✓" # ✓
+
+
+def _summarize_job(job: dict, findings: list[dict]) -> str:
+ """Human-readable trailing summary: '(N tests, M lints, K CVEs)' / fail msg."""
+ recipe = job["recipe"]
+ status = job["status"]
+ if status in ("failed", "timed_out"):
+ # Pull a short reason from the first error finding if any exist.
+ err = next((f for f in findings if f.get("severity") in ("error", "high", "critical")), None)
+ if err:
+ msg = (err.get("message") or "").splitlines()[0][:80]
+ return msg or f"{recipe} failed"
+ return f"{recipe} failed (exit={job.get('exit_code')})"
+
+ n_findings = len(findings)
+ n_warn = sum(1 for f in findings if f.get("severity") in ("warn", "warning"))
+ n_cve = sum(1 for f in findings if f.get("kind") == "cve")
+ if recipe == "lint":
+ return f"0 errors, {n_warn} warnings"
+ if recipe == "audit":
+ return f"{n_cve} CVEs, {n_warn} warnings"
+ if recipe == "test":
+ return f"{n_findings} findings, {n_warn} warnings"
+ return f"{n_findings} findings"
+
+
+def _filter_for_project(jobs_with_findings: list[tuple[dict, list[dict]]], notify_on: list[str]) -> list[tuple[dict, list[dict]]]:
+ """Apply a project's notify.on filter to its jobs.
+
+ nightly_summary -> show everything.
+ Empty list -> use DEFAULT_NOTIFY_ON.
+ Otherwise -> include jobs whose event-tag set intersects notify_on.
+ """
+ if not notify_on:
+ notify_on = list(DEFAULT_NOTIFY_ON)
+ if "nightly_summary" in notify_on:
+ return list(jobs_with_findings)
+ wanted = set(notify_on)
+ out = []
+ for job, findings in jobs_with_findings:
+ tags = _job_event_tags(job, findings)
+ if tags & wanted:
+ out.append((job, findings))
+ return out
+
+
+# --- rendering --------------------------------------------------------------
+
+
+def _render_text(date_str: str, sections: list[dict], full_log_url: str) -> str:
+ """Build the text body. Matches the worked example in the spec."""
+ total_runs = sum(len(s["runs"]) for s in sections)
+ total_drafted = 0 # placeholder, wave 3
+ total_cves = sum(s["cves"] for s in sections)
+ subj_summary = f"{total_runs} build" + ("s" if total_runs != 1 else "")
+ lines = []
+ lines.append(f"Subject: crafting-table digest — {date_str} ({subj_summary}, {total_drafted} patches drafted, {total_cves} CVEs)")
+ lines.append("")
+ lines.append("Overnight runs (last 24h):")
+ if not sections:
+ lines.append(" (no activity)")
+ else:
+ for s in sections:
+ for run in s["runs"]:
+ glyph = run["glyph"]
+ proj_sub = f"{s['project']}::{run['subproject']}"
+ lines.append(
+ f" {glyph} {proj_sub:<32s} {run['recipe']:<6s} {run['status']:<5s} ({run['summary']})"
+ )
+ lines.append("")
+ lines.append("Open follow-ups:")
+ lines.append(" - 0 unmerged auto-patches")
+ lines.append(" - 0 manual review tickets in bugs.sulkta.com")
+ lines.append("")
+ lines.append(f"Full log: {full_log_url}")
+ return "\n".join(lines) + "\n"
+
+
+def _render_html(date_str: str, sections: list[dict], full_log_url: str) -> str:
+ """Build the HTML body. Same content, table styling, monospace font."""
+ total_runs = sum(len(s["runs"]) for s in sections)
+ total_drafted = 0
+ total_cves = sum(s["cves"] for s in sections)
+
+ rows = []
+ for s in sections:
+ for run in s["runs"]:
+ proj_sub = f"{s['project']}::{run['subproject']}"
+ rows.append(
+ f"
| {run['glyph']} | {proj_sub} | "
+ f"{run['recipe']} | {run['status']} | "
+ f"{run['summary']} |
"
+ )
+ if not rows:
+ rows.append('| (no activity) |
')
+
+ return f"""
+
+crafting-table digest — {date_str}
+({total_runs} runs, {total_drafted} patches drafted, {total_cves} CVEs)
+Overnight runs (last 24h)
+
+ | project | recipe | status | summary |
+
+{''.join(rows)}
+
+
+Open follow-ups
+
+- 0 unmerged auto-patches
+- 0 manual review tickets in bugs.sulkta.com
+
+
+
+"""
+
+
+# --- scheduler --------------------------------------------------------------
+
+
+class DigestScheduler:
+ """Daily 06:00 PT digest. Aggregates 24h of jobs per project + sends SMTP.
+
+ Lifecycle:
+ scheduler = DigestScheduler(db, smtp_config)
+ await scheduler.start()
+ ...
+ await scheduler.stop()
+
+ Manual trigger via run_once(...). Pass dry_run=True to render without
+ sending — the body is still returned so the /digests endpoint works.
+ """
+
+ def __init__(
+ self,
+ db: DB,
+ smtp: SmtpConfig | None,
+ *,
+ time_zone: str = "America/Los_Angeles",
+ hour: int = 6,
+ minute: int = 0,
+ full_log_base_url: str = "http://192.168.0.5:8810/digests",
+ ):
+ self.db = db
+ self.smtp = smtp
+ self.tz = ZoneInfo(time_zone)
+ self.hour = hour
+ self.minute = minute
+ self.full_log_base_url = full_log_base_url
+
+ self._loop_task: asyncio.Task | None = None
+ self._stopping = False
+
+ async def start(self) -> None:
+ if self._loop_task is not None:
+ return
+ self._stopping = False
+ self._loop_task = asyncio.create_task(self._loop())
+
+ async def stop(self) -> None:
+ self._stopping = True
+ if self._loop_task is not None:
+ self._loop_task.cancel()
+ try:
+ await self._loop_task
+ except asyncio.CancelledError:
+ pass
+ self._loop_task = None
+
+ async def _loop(self) -> None:
+ try:
+ while not self._stopping:
+ wait = self._seconds_until_next_window()
+ try:
+ await asyncio.sleep(wait)
+ except asyncio.CancelledError:
+ raise
+ if self._stopping:
+ break
+ try:
+ await self.run_once()
+ except Exception as e:
+ log.exception("digest run_once failed: %s", e)
+ # Sleep ~1 minute past the trigger so the next call to
+ # _seconds_until_next_window doesn't re-fire this same window.
+ try:
+ await asyncio.sleep(60)
+ except asyncio.CancelledError:
+ raise
+ except asyncio.CancelledError:
+ raise
+
+ def _seconds_until_next_window(self) -> float:
+ """Compute seconds to the next HH:MM in our timezone. If we're past
+ today's window, jumps to tomorrow."""
+ now = datetime.now(self.tz)
+ target = now.replace(hour=self.hour, minute=self.minute, second=0, microsecond=0)
+ if target <= now:
+ target = target + timedelta(days=1)
+ return max(1.0, (target - now).total_seconds())
+
+ # ---------- digest core ------------------------------------------------
+
+ async def run_once(
+ self,
+ *,
+ target_date: str | None = None,
+ dry_run: bool = False,
+ now_ts: int | None = None,
+ ) -> dict:
+ """Build + (maybe) send the digest.
+
+ target_date: 'YYYY-MM-DD' in the configured tz. Defaults to today.
+ dry_run: skip SMTP send, still record nothing in digest_runs.
+ now_ts: override "now" for tests; defaults to time.time().
+
+ Returns a summary:
+ {
+ "date": "YYYY-MM-DD",
+ "dry_run": bool,
+ "projects": [
+ {"project": str, "recipients": [...], "job_count": int,
+ "sent": bool, "skipped_reason": str|None}
+ ],
+ "text": "",
+ "html": "",
+ "sections": [...],
+ }
+ """
+ loop = asyncio.get_event_loop()
+ return await loop.run_in_executor(
+ None,
+ lambda: self._run_once_sync(target_date=target_date, dry_run=dry_run, now_ts=now_ts),
+ )
+
+ def _run_once_sync(
+ self,
+ *,
+ target_date: str | None,
+ dry_run: bool,
+ now_ts: int | None,
+ ) -> dict:
+ if now_ts is None:
+ now_ts = int(time.time())
+
+ if target_date is None:
+ date_str = datetime.fromtimestamp(now_ts, self.tz).strftime("%Y-%m-%d")
+ else:
+ date_str = target_date
+
+ # Window: last 24h ending at "now" (or end of target_date if backfilling).
+ window_end = now_ts
+ if target_date is not None:
+ # Anchor to local-midnight + 24h of the target date so backfills
+ # are reproducible.
+ day = datetime.strptime(target_date, "%Y-%m-%d").replace(tzinfo=self.tz)
+ window_end = int((day + timedelta(days=1)).timestamp())
+ window_start = window_end - 24 * 3600
+
+ # Pull all projects + their last-24h jobs + findings.
+ projects = self.db.list_projects()
+ per_project_sections: list[dict] = []
+ per_project_meta: list[dict] = []
+ full_log_url = f"{self.full_log_base_url}/{date_str}"
+
+ for prow in projects:
+ recipe = json.loads(prow.get("recipe_json") or "{}")
+ notify = recipe.get("notify") or {}
+ recipients: list[str] = list(notify.get("email") or [])
+ notify_on: list[str] = list(notify.get("on") or [])
+
+ jobs = self.db.list_jobs(project_name=prow["name"], limit=500)
+ # Window filter — finished_at if present, else queued_at.
+ in_window = []
+ for j in jobs:
+ ts = j.get("finished_at") or j.get("queued_at") or 0
+ if window_start <= ts <= window_end:
+ in_window.append(j)
+ with_findings: list[tuple[dict, list[dict]]] = [
+ (j, self.db.list_findings(j["id"])) for j in in_window
+ ]
+ filtered = _filter_for_project(with_findings, notify_on)
+
+ section_runs: list[dict] = []
+ cves = 0
+ for job, findings in filtered:
+ section_runs.append({
+ "subproject": job["subproject_path"],
+ "recipe": job["recipe"],
+ "status": "pass" if job["status"] == "succeeded" else
+ ("fail" if job["status"] in ("failed", "timed_out", "cancelled") else "warn"),
+ "summary": _summarize_job(job, findings),
+ "glyph": _outcome_glyph(job, findings),
+ })
+ cves += sum(1 for f in findings if f.get("kind") == "cve")
+
+ section = {
+ "project": prow["name"],
+ "runs": section_runs,
+ "cves": cves,
+ }
+
+ meta = {
+ "project": prow["name"],
+ "recipients": recipients,
+ "job_count": len(filtered),
+ "sent": False,
+ "skipped_reason": None,
+ }
+
+ # Decide whether to include this project's section in the body.
+ # "include" decisions:
+ # - no recipients -> never include or send
+ # - recipients + activity -> include + send
+ # - recipients + no activity + nightly_summary -> include "no activity" + send
+ # - recipients + no activity + no nightly_summary -> skip silently
+ wants_summary = "nightly_summary" in notify_on
+ if not recipients:
+ meta["skipped_reason"] = "no_recipients"
+ per_project_meta.append(meta)
+ continue
+ if not section_runs and not wants_summary:
+ meta["skipped_reason"] = "zero_activity"
+ per_project_meta.append(meta)
+ continue
+
+ per_project_sections.append(section)
+ per_project_meta.append(meta)
+
+ text_body = _render_text(date_str, per_project_sections, full_log_url)
+ html_body = _render_html(date_str, per_project_sections, full_log_url)
+
+ # Per-project send loop. Idempotency check via digest_runs UNIQUE.
+ for meta, section in zip(
+ [m for m in per_project_meta if m["skipped_reason"] is None],
+ per_project_sections,
+ ):
+ project_name = meta["project"]
+ if not dry_run:
+ already = self.db.digest_run_exists(date_str, project_name)
+ if already:
+ meta["skipped_reason"] = "already_sent"
+ continue
+
+ # Build a per-project-scoped body.
+ proj_text = _render_text(date_str, [section], full_log_url)
+ proj_html = _render_html(date_str, [section], full_log_url)
+ subject = (
+ f"crafting-table digest — {date_str} "
+ f"({len(section['runs'])} runs, 0 patches drafted, {section['cves']} CVEs)"
+ )
+
+ if dry_run or self.smtp is None:
+ meta["sent"] = False
+ if self.smtp is None:
+ meta["skipped_reason"] = "smtp_disabled"
+ continue
+
+ try:
+ self._send_email(
+ recipients=meta["recipients"],
+ subject=subject,
+ text=proj_text,
+ html=proj_html,
+ )
+ meta["sent"] = True
+ self.db.record_digest_run(
+ date=date_str,
+ project_name=project_name,
+ sent_at=int(time.time()),
+ recipient_count=len(meta["recipients"]),
+ job_count=meta["job_count"],
+ )
+ except Exception as e:
+ log.exception("digest SMTP send failed for %s: %s", project_name, e)
+ meta["sent"] = False
+ meta["skipped_reason"] = f"smtp_error:{type(e).__name__}"
+
+ return {
+ "date": date_str,
+ "dry_run": dry_run,
+ "projects": per_project_meta,
+ "text": text_body,
+ "html": html_body,
+ "sections": per_project_sections,
+ "full_log_url": full_log_url,
+ }
+
+ # ---------- SMTP send --------------------------------------------------
+
+ def _send_email(
+ self,
+ *,
+ recipients: list[str],
+ subject: str,
+ text: str,
+ html: str,
+ ) -> None:
+ """Send one MIME multipart email to the given list of recipients.
+
+ We send one message with a To: header containing all recipients, then
+ loop sendmail per address so each recipient gets a personal copy
+ (so the test's assert-once-per-recipient pattern is meaningful, and
+ so partial-failures don't poison the whole batch).
+ """
+ if self.smtp is None:
+ raise RuntimeError("smtp not configured")
+
+ with smtplib.SMTP(self.smtp.host, self.smtp.port, timeout=30) as conn:
+ if self.smtp.use_tls:
+ try:
+ conn.starttls()
+ except smtplib.SMTPNotSupportedError:
+ log.warning("SMTP server does not support STARTTLS; sending cleartext")
+ if self.smtp.username and self.smtp.password:
+ conn.login(self.smtp.username, self.smtp.password)
+ for addr in recipients:
+ msg = EmailMessage()
+ msg["Subject"] = subject
+ msg["From"] = self.smtp.from_addr
+ msg["To"] = addr
+ msg.set_content(text)
+ msg.add_alternative(html, subtype="html")
+ conn.send_message(msg)
diff --git a/crafting_table/models.py b/crafting_table/models.py
index 48b45c4..431ed96 100644
--- a/crafting_table/models.py
+++ b/crafting_table/models.py
@@ -42,6 +42,21 @@ class Schedule(BaseModel):
class Notify(BaseModel):
+ """Per-project email digest preferences.
+
+ `email` — recipient list. Empty list silently excludes the project from
+ every digest.
+
+ `on` — event filter for which job outcomes show up in this project's
+ digest section. Recognized tags (see crafting_table.digest):
+ audit_pass / audit_fail / test_fail / lint_warn / cve_found /
+ patch_drafted (wave 3) / nightly_summary (show all)
+ Empty list defaults to `["audit_fail", "cve_found", "patch_drafted"]`
+ at digest-render time.
+
+ `auto_patch` — wave 3 / step 9 toggle for the autonomous patch loop.
+ """
+
email: list[str] = Field(default_factory=list)
on: list[str] = Field(default_factory=lambda: ["audit_fail", "cve_found", "patch_drafted"])
auto_patch: bool = False
diff --git a/crafting_table/server.py b/crafting_table/server.py
index 602ce90..81115f2 100644
--- a/crafting_table/server.py
+++ b/crafting_table/server.py
@@ -42,6 +42,7 @@ from fastapi.responses import FileResponse, JSONResponse
from .auth import Auth, AppToken
from .config import Config, load
from .db import DB
+from .digest import DigestScheduler, SmtpConfig
from .models import (
CreateJobRequest,
Project,
@@ -71,6 +72,10 @@ runner: Runner = Runner(
gc_interval_secs=cfg.workspace_gc_interval_secs,
gc_age_secs=cfg.workspace_gc_age_secs,
)
+# SMTP-driven; if CRAFTING_SMTP_HOST is unset the scheduler stays None and the
+# lifespan logs a "digest disabled" warning. Endpoints still render in dry-run.
+_smtp_cfg: SmtpConfig | None = SmtpConfig.from_env()
+digest_scheduler: DigestScheduler = DigestScheduler(db=db, smtp=_smtp_cfg)
# ---------- lifespan --------------------------------------------------------
@@ -80,6 +85,14 @@ runner: Runner = Runner(
async def _lifespan(app: FastAPI):
auth.bootstrap_admin(cfg.admin_bearer_path)
await runner.start()
+ if _smtp_cfg is None:
+ log.warning("digest disabled — CRAFTING_SMTP_HOST not set")
+ else:
+ await digest_scheduler.start()
+ log.info(
+ "digest scheduler started: smtp=%s:%d from=%s",
+ _smtp_cfg.host, _smtp_cfg.port, _smtp_cfg.from_addr,
+ )
log.info(
"crafting-table startup: db=%s log_dir=%s max_concurrent=%d port=%d",
cfg.db_path, cfg.log_dir, cfg.max_concurrent_jobs, cfg.api_port,
@@ -87,6 +100,7 @@ async def _lifespan(app: FastAPI):
try:
yield
finally:
+ await digest_scheduler.stop()
await runner.stop()
log.info("crafting-table shutdown complete")
@@ -459,6 +473,43 @@ async def get_job_findings(
return {"ok": True, "findings": findings}
+# ---- /digests --------------------------------------------------------------
+
+
+@app.get("/digests/{date}")
+async def get_digest(
+ date: str,
+ request: Request,
+ authorization: Annotated[str | None, Header()] = None,
+):
+ """Render the digest body for a given date (YYYY-MM-DD in PT) without
+ sending. Admin token required. Wave 2C: this is a dry-run rendering pass
+ so the email's "Full log:" link resolves; future waves can persist
+ rendered bodies to a blob store and pull them here for archive replay.
+ """
+ auth.require_admin(request, authorization)
+ # basic shape validation
+ if len(date) != 10 or date[4] != "-" or date[7] != "-":
+ raise HTTPException(400, "date must be YYYY-MM-DD")
+ summary = await digest_scheduler.run_once(target_date=date, dry_run=True)
+ return {"ok": True, "digest": summary}
+
+
+@app.post("/admin/digest/run-now")
+async def admin_digest_run_now(
+ request: Request,
+ authorization: Annotated[str | None, Header()] = None,
+ body: dict | None = None,
+):
+ """Manual digest trigger. Admin only. body={"dry_run": true} skips SMTP."""
+ auth.require_admin(request, authorization)
+ body = body or {}
+ dry_run = bool(body.get("dry_run", False))
+ target_date = body.get("date")
+ summary = await digest_scheduler.run_once(target_date=target_date, dry_run=dry_run)
+ return {"ok": True, "digest": summary}
+
+
# ---------- helpers ---------------------------------------------------------
diff --git a/tests/test_db.py b/tests/test_db.py
index c310327..528f794 100644
--- a/tests/test_db.py
+++ b/tests/test_db.py
@@ -9,12 +9,13 @@ import pytest
def test_migrations_applied(db_only):
versions = db_only.applied_migrations()
- # All 5 migrations from the MIGRATIONS list should land on first boot.
+ # All MIGRATIONS entries should land on first boot.
assert "001_schema_migrations" in versions
assert "002_tokens" in versions
assert "003_projects" in versions
assert "004_jobs" in versions
assert "005_findings" in versions
+ assert "006_digest_runs" in versions
def test_migrations_idempotent(db_only):
@@ -33,7 +34,7 @@ def test_schema_has_required_tables(db_only):
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
).fetchall()
names = {r["name"] for r in rows}
- for required in ("schema_migrations", "tokens", "projects", "jobs", "findings"):
+ for required in ("schema_migrations", "tokens", "projects", "jobs", "findings", "digest_runs"):
assert required in names, f"missing table: {required}"
diff --git a/tests/test_digest.py b/tests/test_digest.py
new file mode 100644
index 0000000..5c2a5f1
--- /dev/null
+++ b/tests/test_digest.py
@@ -0,0 +1,352 @@
+"""Email digest scheduler tests.
+
+Cover:
+- 24h aggregation across multiple projects
+- notify.on event filtering
+- skipping projects with no notify.email
+- SMTP send is invoked once per recipient (mocked smtplib.SMTP)
+- zero-state behavior (skipped unless nightly_summary requested)
+- Idempotency via digest_runs UNIQUE constraint
+- POST /admin/digest/run-now endpoint
+
+We don't boot the full server unless an HTTP-level surface is being exercised.
+For the unit-style tests we instantiate DigestScheduler directly against a
+fresh DB.
+"""
+from __future__ import annotations
+
+import json
+import time
+from unittest import mock
+
+import pytest
+
+from tests.conftest import sample_project_payload
+
+
+# ---------- helpers --------------------------------------------------------
+
+
+def _seed_project(db, *, name: str, owner_token: str = "owner-x",
+ email: list[str] | None = None,
+ notify_on: list[str] | None = None) -> None:
+ if email is None:
+ email = []
+ if notify_on is None:
+ notify_on = []
+ db.insert_token(name=owner_token, bearer=f"t-{owner_token}-{name}",
+ is_admin=False, ip_cidrs=None)
+ recipe = {
+ "languages": ["python"],
+ "subprojects": [{"path": ".", "language": "python"}],
+ "schedule": {},
+ "notify": {"email": email, "on": notify_on, "auto_patch": False},
+ }
+ db.upsert_project(
+ name=name, git_url="g", default_branch="main",
+ recipe_json=json.dumps(recipe), owner_token=owner_token,
+ )
+
+
+def _seed_job(db, *, project_name: str, job_id: str,
+ recipe: str = "audit", status: str = "succeeded",
+ hours_ago: float = 1.0, exit_code: int = 0,
+ subproject_path: str = ".",
+ findings: list[dict] | None = None) -> None:
+ """Insert a finished job N hours ago. Optional findings list adds
+ structured findings. tmp log_path is a string only — we don't write."""
+ db.insert_job(
+ job_id=job_id, project_name=project_name,
+ subproject_path=subproject_path, recipe=recipe, branch="main",
+ log_path=f"/tmp/{job_id}.log",
+ recipe_snapshot_json="{}",
+ )
+ # Backdate queued_at + finished_at by editing the row directly.
+ finished = int(time.time() - hours_ago * 3600)
+ queued = finished - 60
+ started = finished - 30
+ with db._conn() as c:
+ c.execute(
+ "UPDATE jobs SET queued_at=?, started_at=?, finished_at=?, status=?, exit_code=? WHERE id=?",
+ (queued, started, finished, status, exit_code, job_id),
+ )
+ for f in findings or []:
+ db.insert_finding(
+ job_id=job_id,
+ kind=f.get("kind", "lint"),
+ severity=f.get("severity", "warn"),
+ file=f.get("file"),
+ line=f.get("line"),
+ code=f.get("code"),
+ message=f.get("message", "x"),
+ fingerprint=f.get("fingerprint", f"fp-{job_id}-{f.get('kind','lint')}"),
+ suggested_fix=f.get("suggested_fix"),
+ )
+
+
+# ---------- direct-call tests ---------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_digest_aggregates_jobs_in_window(db_only):
+ from crafting_table.digest import DigestScheduler, SmtpConfig
+
+ _seed_project(db_only, name="alpha", owner_token="oa",
+ email=["cobb@sulkta.com"], notify_on=["nightly_summary"])
+ _seed_project(db_only, name="beta", owner_token="ob",
+ email=["cobb@sulkta.com"], notify_on=["nightly_summary"])
+
+ _seed_job(db_only, project_name="alpha", job_id="a-pass",
+ recipe="audit", status="succeeded", hours_ago=2)
+ _seed_job(db_only, project_name="alpha", job_id="a-fail",
+ recipe="audit", status="failed", hours_ago=3, exit_code=1)
+ _seed_job(db_only, project_name="beta", job_id="b-warn",
+ recipe="lint", status="succeeded", hours_ago=4,
+ findings=[{"kind": "lint", "severity": "warn", "message": "ruff warning"}])
+ # Out-of-window: 36h ago should not appear
+ _seed_job(db_only, project_name="alpha", job_id="a-old",
+ recipe="audit", status="succeeded", hours_ago=36)
+
+ smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
+ sched = DigestScheduler(db=db_only, smtp=smtp)
+ out = await sched.run_once(dry_run=True)
+
+ text = out["text"]
+ assert "alpha::." in text
+ assert "beta::." in text
+ assert "a-old" not in text # out-of-window job's id won't render anyway
+
+ # alpha has 2 in-window runs (pass + fail), beta has 1 lint warn
+ alpha_section = next(s for s in out["sections"] if s["project"] == "alpha")
+ beta_section = next(s for s in out["sections"] if s["project"] == "beta")
+ assert len(alpha_section["runs"]) == 2
+ assert len(beta_section["runs"]) == 1
+ # Pass + fail glyphs both appear
+ glyphs = {r["glyph"] for r in alpha_section["runs"]}
+ assert "✓" in glyphs and "✗" in glyphs
+
+
+@pytest.mark.asyncio
+async def test_digest_filters_by_notify_on(db_only):
+ """Project with notify.on=['audit_fail'] should ONLY see failed audits."""
+ from crafting_table.digest import DigestScheduler, SmtpConfig
+
+ _seed_project(db_only, name="filterproj", owner_token="o",
+ email=["x@example.com"], notify_on=["audit_fail"])
+ _seed_job(db_only, project_name="filterproj", job_id="fp-pass",
+ recipe="audit", status="succeeded", hours_ago=1)
+ _seed_job(db_only, project_name="filterproj", job_id="fp-fail",
+ recipe="audit", status="failed", hours_ago=2, exit_code=1)
+ _seed_job(db_only, project_name="filterproj", job_id="fp-lint",
+ recipe="lint", status="succeeded", hours_ago=2,
+ findings=[{"kind": "lint", "severity": "warn", "message": "w"}])
+
+ smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
+ sched = DigestScheduler(db=db_only, smtp=smtp)
+ out = await sched.run_once(dry_run=True)
+
+ sections = [s for s in out["sections"] if s["project"] == "filterproj"]
+ assert len(sections) == 1
+ runs = sections[0]["runs"]
+ assert len(runs) == 1
+ assert runs[0]["recipe"] == "audit"
+ assert runs[0]["status"] == "fail"
+
+
+@pytest.mark.asyncio
+async def test_digest_skips_projects_with_no_email(db_only):
+ """Projects without notify.email must NOT appear in any sent body."""
+ from crafting_table.digest import DigestScheduler, SmtpConfig
+
+ _seed_project(db_only, name="silent", owner_token="o1",
+ email=[], notify_on=["nightly_summary"])
+ _seed_project(db_only, name="loud", owner_token="o2",
+ email=["x@example.com"], notify_on=["nightly_summary"])
+ _seed_job(db_only, project_name="silent", job_id="s1",
+ recipe="audit", status="succeeded", hours_ago=1)
+ _seed_job(db_only, project_name="loud", job_id="l1",
+ recipe="audit", status="succeeded", hours_ago=1)
+
+ smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
+ sched = DigestScheduler(db=db_only, smtp=smtp)
+ out = await sched.run_once(dry_run=True)
+
+ sections_by_proj = {s["project"] for s in out["sections"]}
+ assert "loud" in sections_by_proj
+ assert "silent" not in sections_by_proj
+
+ # Meta record reflects skip reason for the silent project
+ silent_meta = next(m for m in out["projects"] if m["project"] == "silent")
+ assert silent_meta["skipped_reason"] == "no_recipients"
+
+
+@pytest.mark.asyncio
+async def test_digest_smtp_send_via_mock(db_only):
+ """Patching smtplib.SMTP at the module level — confirms send_message is
+ called once per recipient and the digest_runs row is recorded."""
+ from crafting_table import digest as digest_mod
+ from crafting_table.digest import DigestScheduler, SmtpConfig
+
+ _seed_project(db_only, name="mailme", owner_token="o",
+ email=["a@example.com", "b@example.com"],
+ notify_on=["nightly_summary"])
+ _seed_job(db_only, project_name="mailme", job_id="m1",
+ recipe="audit", status="succeeded", hours_ago=2)
+
+ smtp = SmtpConfig(host="localhost", port=2525, use_tls=False,
+ from_addr="ct@localhost")
+ sched = DigestScheduler(db=db_only, smtp=smtp)
+
+ fake_conn = mock.MagicMock()
+ fake_conn.__enter__.return_value = fake_conn
+ fake_conn.__exit__.return_value = False
+
+ with mock.patch.object(digest_mod.smtplib, "SMTP", return_value=fake_conn) as smtp_cls:
+ out = await sched.run_once(dry_run=False)
+
+ assert smtp_cls.called, "smtplib.SMTP was never instantiated"
+ # send_message called once per recipient
+ assert fake_conn.send_message.call_count == 2
+
+ # digest_runs has the idempotency row
+ runs = db_only.list_digest_runs()
+ assert len(runs) == 1
+ assert runs[0]["project_name"] == "mailme"
+ assert runs[0]["recipient_count"] == 2
+ assert runs[0]["job_count"] == 1
+
+ mailme_meta = next(m for m in out["projects"] if m["project"] == "mailme")
+ assert mailme_meta["sent"] is True
+
+
+@pytest.mark.asyncio
+async def test_digest_zero_state(db_only):
+ """No jobs in window:
+ - project with nightly_summary -> body still rendered + sent
+ - project without nightly_summary -> skipped silently (zero_activity)
+ """
+ from crafting_table import digest as digest_mod
+ from crafting_table.digest import DigestScheduler, SmtpConfig
+
+ _seed_project(db_only, name="quiet-nightly", owner_token="o1",
+ email=["x@example.com"], notify_on=["nightly_summary"])
+ _seed_project(db_only, name="quiet-default", owner_token="o2",
+ email=["x@example.com"], notify_on=[])
+
+ smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
+ sched = DigestScheduler(db=db_only, smtp=smtp)
+
+ fake_conn = mock.MagicMock()
+ fake_conn.__enter__.return_value = fake_conn
+ fake_conn.__exit__.return_value = False
+ with mock.patch.object(digest_mod.smtplib, "SMTP", return_value=fake_conn):
+ out = await sched.run_once(dry_run=False)
+
+ metas = {m["project"]: m for m in out["projects"]}
+ assert metas["quiet-default"]["skipped_reason"] == "zero_activity"
+ assert metas["quiet-default"]["sent"] is False
+
+ # nightly_summary project still got a send (one recipient)
+ assert metas["quiet-nightly"]["sent"] is True
+ assert fake_conn.send_message.call_count == 1
+
+
+@pytest.mark.asyncio
+async def test_digest_idempotent(db_only):
+ """Calling run_once twice for the same date should send ONE email per
+ project, not two — second call hits the digest_runs UNIQUE."""
+ from crafting_table import digest as digest_mod
+ from crafting_table.digest import DigestScheduler, SmtpConfig
+
+ _seed_project(db_only, name="idem", owner_token="o",
+ email=["x@example.com"], notify_on=["nightly_summary"])
+ _seed_job(db_only, project_name="idem", job_id="i1",
+ recipe="audit", status="succeeded", hours_ago=1)
+
+ smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
+ sched = DigestScheduler(db=db_only, smtp=smtp)
+
+ fake_conn = mock.MagicMock()
+ fake_conn.__enter__.return_value = fake_conn
+ fake_conn.__exit__.return_value = False
+
+ with mock.patch.object(digest_mod.smtplib, "SMTP", return_value=fake_conn):
+ await sched.run_once(dry_run=False)
+ await sched.run_once(dry_run=False)
+
+ # Only one send despite two run_once calls
+ assert fake_conn.send_message.call_count == 1
+ runs = db_only.list_digest_runs()
+ assert len(runs) == 1
+
+
+# ---------- HTTP-level test ------------------------------------------------
+
+
+def test_admin_digest_run_now_endpoint(client):
+ """POST /admin/digest/run-now with dry_run=True returns the digest body
+ and never touches SMTP."""
+ tc, ctx = client
+
+ # Register a project under alpha with email + nightly_summary
+ payload = sample_project_payload(name="ep")
+ payload["notify"] = {
+ "email": ["cobb@sulkta.com"],
+ "on": ["nightly_summary"],
+ "auto_patch": False,
+ }
+ r = tc.post(
+ "/projects",
+ headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
+ json=payload,
+ )
+ assert r.status_code == 200, r.text
+
+ # Admin POST /admin/digest/run-now
+ r2 = tc.post(
+ "/admin/digest/run-now",
+ headers={"Authorization": f"Bearer {ctx['admin_bearer']}"},
+ json={"dry_run": True},
+ )
+ assert r2.status_code == 200, r2.text
+ body = r2.json()
+ assert body["ok"] is True
+ assert "digest" in body
+ assert body["digest"]["dry_run"] is True
+ assert "text" in body["digest"]
+ assert "html" in body["digest"]
+
+ # Non-admin shouldn't be able to call this
+ r3 = tc.post(
+ "/admin/digest/run-now",
+ headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
+ json={"dry_run": True},
+ )
+ assert r3.status_code == 403
+
+
+def test_get_digest_endpoint_admin_only(client):
+ """GET /digests/{date} requires admin and returns rendered body."""
+ tc, ctx = client
+
+ r = tc.get(
+ "/digests/2026-04-29",
+ headers={"Authorization": f"Bearer {ctx['admin_bearer']}"},
+ )
+ assert r.status_code == 200, r.text
+ body = r.json()
+ assert body["ok"] is True
+ assert body["digest"]["date"] == "2026-04-29"
+ assert body["digest"]["dry_run"] is True
+
+ r2 = tc.get(
+ "/digests/not-a-date",
+ headers={"Authorization": f"Bearer {ctx['admin_bearer']}"},
+ )
+ assert r2.status_code == 400
+
+ r3 = tc.get(
+ "/digests/2026-04-29",
+ headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
+ )
+ assert r3.status_code == 403