From 98306ca2e0e735d81534a8c421ba6ca05471f786 Mon Sep 17 00:00:00 2001 From: Kayos Date: Wed, 29 Apr 2026 08:32:56 -0700 Subject: [PATCH] v0.1 wave 2C (step 8): email digest scheduler - digest.py: DigestScheduler with daily 06:00 PT loop - SmtpConfig env-driven (CRAFTING_SMTP_*) - notify.on event filter respected per project - GET /digests/{date} + POST /admin/digest/run-now (dry_run flag) - migration 006: digest_runs (idempotency via UNIQUE(date, project_name)) - text + HTML email bodies; matches spec's worked example - Server lifespan integration; gracefully disables if SMTP not configured - tests/test_digest.py: 8 tests (aggregation / filter / smtp mock / idempotency / endpoint) Patch-drafted line is a placeholder until wave 3 / step 9 ships. Spec: memory/spec-crafting-table.md --- .env.example | 11 + README.md | 52 +++- crafting_table/db.py | 61 +++++ crafting_table/digest.py | 551 +++++++++++++++++++++++++++++++++++++++ crafting_table/models.py | 15 ++ crafting_table/server.py | 51 ++++ tests/test_db.py | 5 +- tests/test_digest.py | 352 +++++++++++++++++++++++++ 8 files changed, 1095 insertions(+), 3 deletions(-) create mode 100644 crafting_table/digest.py create mode 100644 tests/test_digest.py diff --git a/.env.example b/.env.example index e813cee..e2318b9 100644 --- a/.env.example +++ b/.env.example @@ -31,3 +31,14 @@ CRAFTING_DEFAULT_JOB_TIMEOUT=1800 # Workspace gc — how often to sweep for stale worktrees, and the age cutoff. CRAFTING_GC_INTERVAL=3600 CRAFTING_GC_AGE=86400 + +# --- Email digest (optional, off by default) ------------------------------- +# If CRAFTING_SMTP_HOST is empty, the digest scheduler stays disabled and +# the server logs a "digest disabled" warning at startup. Setting it on +# enables the daily 06:00 PT loop. +# CRAFTING_SMTP_HOST=postfix.sulkta.com +# CRAFTING_SMTP_PORT=587 +# CRAFTING_SMTP_USER=crafting-table@sulkta.com +# CRAFTING_SMTP_PASS= +# CRAFTING_SMTP_FROM=crafting-table@sulkta.com +# CRAFTING_SMTP_TLS=1 diff --git a/README.md b/README.md index 34c6aaa..733e761 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ Spec: `Sulkta-Coop/openclaw-workspace/memory/spec-crafting-table.md` (LAN-only). - [ ] Step 5: Per-language parsers (Rust / Python / Go / TS first) - [ ] Step 6: Findings extraction + storage - [ ] Step 7: MCP server (stdio JSON-RPC, 8 tools) -- [ ] Step 8: Email digest scheduler +- [x] Step 8: Email digest scheduler - [ ] Step 9: Autonomous patch loop (clawdforge integration) - [ ] Step 10: Production recipes — clawdforge, cauldron, tradecraft @@ -222,6 +222,56 @@ pip install -e '.[test]' pytest tests/ ``` +## Digest + +Daily 06:00 PT email digest. One message per project per day; aggregates the +last 24h of jobs per recipient and sends via SMTP relay (Lucy postfix). + +Set the SMTP block in `.env` to enable — leaving `CRAFTING_SMTP_HOST` unset +keeps the scheduler off and logs `digest disabled — CRAFTING_SMTP_HOST not set` +at startup. The `/digests` and `/admin/digest/run-now` endpoints still work +in dry-run mode regardless. + +```bash +CRAFTING_SMTP_HOST=postfix.sulkta.com +CRAFTING_SMTP_PORT=587 +CRAFTING_SMTP_USER=crafting-table@sulkta.com +CRAFTING_SMTP_PASS=... +CRAFTING_SMTP_FROM=crafting-table@sulkta.com +CRAFTING_SMTP_TLS=1 +``` + +Each project's `notify.email` + `notify.on` fields control delivery: + +| `notify.on` event | When it fires | +|---------------------|--------------------------------------------------------| +| `audit_pass` | a passing audit job | +| `audit_fail` | a failing audit job | +| `test_fail` | a failing test job | +| `lint_warn` | a lint job with warning-severity findings | +| `cve_found` | any job whose findings include a `cve` | +| `patch_drafted` | (wave 3 / step 9) auto-patch was drafted | +| `nightly_summary` | catch-all — show ALL jobs in the project's section | + +Empty `notify.on` defaults to `["audit_fail", "cve_found", "patch_drafted"]`. +Empty `notify.email` silently excludes the project. + +Manual trigger from the LAN admin token: + +```bash +# Render today's digest without sending +curl -sH "Authorization: Bearer $ADMIN" \ + -X POST http://192.168.0.5:8810/admin/digest/run-now \ + -d '{"dry_run": true}' | jq . + +# Render an arbitrary date as JSON +curl -sH "Authorization: Bearer $ADMIN" \ + http://192.168.0.5:8810/digests/2026-04-29 | jq . +``` + +Idempotency: `digest_runs` table holds `UNIQUE(date, project_name)`, so the +06:00 loop is safe to re-fire on the same day — only the first call sends. + ## License MIT diff --git a/crafting_table/db.py b/crafting_table/db.py index 8676155..fb1bd4d 100644 --- a/crafting_table/db.py +++ b/crafting_table/db.py @@ -112,6 +112,21 @@ MIGRATIONS: list[tuple[str, str]] = [ CREATE INDEX IF NOT EXISTS idx_findings_fingerprint ON findings(fingerprint); """, ), + ( + "006_digest_runs", + """ + CREATE TABLE IF NOT EXISTS digest_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date TEXT NOT NULL, + project_name TEXT NOT NULL, + sent_at INTEGER NOT NULL, + recipient_count INTEGER NOT NULL, + job_count INTEGER NOT NULL, + UNIQUE(date, project_name) + ); + CREATE INDEX IF NOT EXISTS idx_digest_runs_date ON digest_runs(date); + """, + ), ] # fmt: on @@ -489,6 +504,52 @@ class DB: ).fetchall() return [dict(r) for r in rows] + # ---------- digest runs -------------------------------------------------- + + def record_digest_run( + self, + *, + date: str, + project_name: str, + sent_at: int, + recipient_count: int, + job_count: int, + ) -> bool: + """Record an attempted/successful digest send. UNIQUE(date, project_name) + enforces idempotency — a second call for the same date+project is a + no-op (returns False).""" + with self._conn() as c: + cur = c.execute( + """ + INSERT OR IGNORE INTO digest_runs + (date, project_name, sent_at, recipient_count, job_count) + VALUES (?, ?, ?, ?, ?) + """, + (date, project_name, sent_at, recipient_count, job_count), + ) + return cur.rowcount == 1 + + def digest_run_exists(self, date: str, project_name: str) -> bool: + with self._conn() as c: + row = c.execute( + "SELECT 1 FROM digest_runs WHERE date=? AND project_name=?", + (date, project_name), + ).fetchone() + return row is not None + + def list_digest_runs(self, date: str | None = None) -> list[dict]: + with self._conn() as c: + if date is None: + rows = c.execute( + "SELECT * FROM digest_runs ORDER BY date DESC, project_name" + ).fetchall() + else: + rows = c.execute( + "SELECT * FROM digest_runs WHERE date=? ORDER BY project_name", + (date,), + ).fetchall() + return [dict(r) for r in rows] + # ---------- async wrappers ---------------------------------------------- async def arun(self, fn, *args, **kwargs): diff --git a/crafting_table/digest.py b/crafting_table/digest.py new file mode 100644 index 0000000..7317288 --- /dev/null +++ b/crafting_table/digest.py @@ -0,0 +1,551 @@ +"""Email digest scheduler. + +Aggregates the last 24h of jobs per project, builds a text + HTML email body, +and sends it via SMTP. Runs daily at 06:00 PT (configurable). Each project's +`notify.on` event filter controls which job outcomes show up in its digest. + +Design notes: +- We use a sleep-until-next-window asyncio loop instead of cron — keeps the + scheduler in-process so we can dry-run via the API for testing. +- Idempotency is enforced by a `digest_runs` table with UNIQUE(date, project_name). + Calling run_once twice for the same date will only send one email per project. +- SMTP send is done via stdlib smtplib (sync) wrapped in run_in_executor so + we don't block the loop while postfix grumbles. +- If SMTP isn't configured, the server lifespan logs a warning and skips + scheduler startup. The /digests endpoints still work for dry-run rendering. +- Patch-drafted / auto-patches / bugs.sulkta.com numbers are zero-state + placeholders for v0.1 wave 2C — wave 3 / step 9 wires them. +""" +from __future__ import annotations + +import asyncio +import json +import logging +import os +import smtplib +import time +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from email.message import EmailMessage +from pathlib import Path +from typing import Iterable +from zoneinfo import ZoneInfo + +from .db import DB + + +log = logging.getLogger("crafting_table.digest") + + +# Recognized notify.on event tags. Documented for callers; the digest filter +# consults this to map job outcomes to events. +VALID_NOTIFY_EVENTS = ( + "audit_pass", + "audit_fail", + "test_fail", + "lint_warn", + "cve_found", + "patch_drafted", + "nightly_summary", +) + +# Default events when a project has notify.email but empty notify.on. +DEFAULT_NOTIFY_ON = ("audit_fail", "cve_found", "patch_drafted") + + +@dataclass(frozen=True) +class SmtpConfig: + host: str + port: int = 587 + username: str | None = None + password: str | None = None + from_addr: str = "crafting-table@localhost" + use_tls: bool = True + + @classmethod + def from_env(cls) -> "SmtpConfig | None": + """Read CRAFTING_SMTP_* env vars. Returns None if CRAFTING_SMTP_HOST + is missing — caller treats that as "digest disabled".""" + host = os.environ.get("CRAFTING_SMTP_HOST", "").strip() + if not host: + return None + return cls( + host=host, + port=int(os.environ.get("CRAFTING_SMTP_PORT", "587")), + username=os.environ.get("CRAFTING_SMTP_USER") or None, + password=os.environ.get("CRAFTING_SMTP_PASS") or None, + from_addr=os.environ.get("CRAFTING_SMTP_FROM", "crafting-table@localhost"), + use_tls=os.environ.get("CRAFTING_SMTP_TLS", "1").strip() not in ("0", "false", "no", ""), + ) + + +# --- helpers ---------------------------------------------------------------- + + +def _job_event_tags(job: dict, findings: list[dict]) -> set[str]: + """Map a job + its findings to notify.on event tags. + + Mirrors the spec's vocabulary: + - audit/test/lint kind + status -> {recipe}_pass / {recipe}_fail / lint_warn + - any finding with kind=='cve' -> cve_found + - patch_drafted is reserved for wave 3 / step 9 (unused here) + """ + tags: set[str] = set() + recipe = job["recipe"] + status = job["status"] + if status == "succeeded": + tags.add(f"{recipe}_pass") + elif status in ("failed", "timed_out"): + tags.add(f"{recipe}_fail") + + has_warn = any(f.get("severity") == "warn" for f in findings) + if recipe == "lint" and has_warn: + tags.add("lint_warn") + + if any(f.get("kind") == "cve" for f in findings): + tags.add("cve_found") + + return tags + + +def _outcome_glyph(job: dict, findings: list[dict]) -> str: + """Choose the leading char per run line. Matches spec's worked example: + ✓ pass, ✗ fail/timed_out, ⚠ pass-with-warnings (lint warn / non-cve warn). + """ + if job["status"] in ("failed", "timed_out", "cancelled"): + return "✗" # ✗ + has_warn = any(f.get("severity") in ("warn", "warning") for f in findings) + if has_warn: + return "⚠" # ⚠ + return "✓" # ✓ + + +def _summarize_job(job: dict, findings: list[dict]) -> str: + """Human-readable trailing summary: '(N tests, M lints, K CVEs)' / fail msg.""" + recipe = job["recipe"] + status = job["status"] + if status in ("failed", "timed_out"): + # Pull a short reason from the first error finding if any exist. + err = next((f for f in findings if f.get("severity") in ("error", "high", "critical")), None) + if err: + msg = (err.get("message") or "").splitlines()[0][:80] + return msg or f"{recipe} failed" + return f"{recipe} failed (exit={job.get('exit_code')})" + + n_findings = len(findings) + n_warn = sum(1 for f in findings if f.get("severity") in ("warn", "warning")) + n_cve = sum(1 for f in findings if f.get("kind") == "cve") + if recipe == "lint": + return f"0 errors, {n_warn} warnings" + if recipe == "audit": + return f"{n_cve} CVEs, {n_warn} warnings" + if recipe == "test": + return f"{n_findings} findings, {n_warn} warnings" + return f"{n_findings} findings" + + +def _filter_for_project(jobs_with_findings: list[tuple[dict, list[dict]]], notify_on: list[str]) -> list[tuple[dict, list[dict]]]: + """Apply a project's notify.on filter to its jobs. + + nightly_summary -> show everything. + Empty list -> use DEFAULT_NOTIFY_ON. + Otherwise -> include jobs whose event-tag set intersects notify_on. + """ + if not notify_on: + notify_on = list(DEFAULT_NOTIFY_ON) + if "nightly_summary" in notify_on: + return list(jobs_with_findings) + wanted = set(notify_on) + out = [] + for job, findings in jobs_with_findings: + tags = _job_event_tags(job, findings) + if tags & wanted: + out.append((job, findings)) + return out + + +# --- rendering -------------------------------------------------------------- + + +def _render_text(date_str: str, sections: list[dict], full_log_url: str) -> str: + """Build the text body. Matches the worked example in the spec.""" + total_runs = sum(len(s["runs"]) for s in sections) + total_drafted = 0 # placeholder, wave 3 + total_cves = sum(s["cves"] for s in sections) + subj_summary = f"{total_runs} build" + ("s" if total_runs != 1 else "") + lines = [] + lines.append(f"Subject: crafting-table digest — {date_str} ({subj_summary}, {total_drafted} patches drafted, {total_cves} CVEs)") + lines.append("") + lines.append("Overnight runs (last 24h):") + if not sections: + lines.append(" (no activity)") + else: + for s in sections: + for run in s["runs"]: + glyph = run["glyph"] + proj_sub = f"{s['project']}::{run['subproject']}" + lines.append( + f" {glyph} {proj_sub:<32s} {run['recipe']:<6s} {run['status']:<5s} ({run['summary']})" + ) + lines.append("") + lines.append("Open follow-ups:") + lines.append(" - 0 unmerged auto-patches") + lines.append(" - 0 manual review tickets in bugs.sulkta.com") + lines.append("") + lines.append(f"Full log: {full_log_url}") + return "\n".join(lines) + "\n" + + +def _render_html(date_str: str, sections: list[dict], full_log_url: str) -> str: + """Build the HTML body. Same content, table styling, monospace font.""" + total_runs = sum(len(s["runs"]) for s in sections) + total_drafted = 0 + total_cves = sum(s["cves"] for s in sections) + + rows = [] + for s in sections: + for run in s["runs"]: + proj_sub = f"{s['project']}::{run['subproject']}" + rows.append( + f"{run['glyph']}{proj_sub}" + f"{run['recipe']}{run['status']}" + f"{run['summary']}" + ) + if not rows: + rows.append('(no activity)') + + return f""" + +

crafting-table digest — {date_str}
+({total_runs} runs, {total_drafted} patches drafted, {total_cves} CVEs)

+

Overnight runs (last 24h)

+ + + +{''.join(rows)} + +
projectrecipestatussummary
+

Open follow-ups

+ +

Full log: {full_log_url}

+ +""" + + +# --- scheduler -------------------------------------------------------------- + + +class DigestScheduler: + """Daily 06:00 PT digest. Aggregates 24h of jobs per project + sends SMTP. + + Lifecycle: + scheduler = DigestScheduler(db, smtp_config) + await scheduler.start() + ... + await scheduler.stop() + + Manual trigger via run_once(...). Pass dry_run=True to render without + sending — the body is still returned so the /digests endpoint works. + """ + + def __init__( + self, + db: DB, + smtp: SmtpConfig | None, + *, + time_zone: str = "America/Los_Angeles", + hour: int = 6, + minute: int = 0, + full_log_base_url: str = "http://192.168.0.5:8810/digests", + ): + self.db = db + self.smtp = smtp + self.tz = ZoneInfo(time_zone) + self.hour = hour + self.minute = minute + self.full_log_base_url = full_log_base_url + + self._loop_task: asyncio.Task | None = None + self._stopping = False + + async def start(self) -> None: + if self._loop_task is not None: + return + self._stopping = False + self._loop_task = asyncio.create_task(self._loop()) + + async def stop(self) -> None: + self._stopping = True + if self._loop_task is not None: + self._loop_task.cancel() + try: + await self._loop_task + except asyncio.CancelledError: + pass + self._loop_task = None + + async def _loop(self) -> None: + try: + while not self._stopping: + wait = self._seconds_until_next_window() + try: + await asyncio.sleep(wait) + except asyncio.CancelledError: + raise + if self._stopping: + break + try: + await self.run_once() + except Exception as e: + log.exception("digest run_once failed: %s", e) + # Sleep ~1 minute past the trigger so the next call to + # _seconds_until_next_window doesn't re-fire this same window. + try: + await asyncio.sleep(60) + except asyncio.CancelledError: + raise + except asyncio.CancelledError: + raise + + def _seconds_until_next_window(self) -> float: + """Compute seconds to the next HH:MM in our timezone. If we're past + today's window, jumps to tomorrow.""" + now = datetime.now(self.tz) + target = now.replace(hour=self.hour, minute=self.minute, second=0, microsecond=0) + if target <= now: + target = target + timedelta(days=1) + return max(1.0, (target - now).total_seconds()) + + # ---------- digest core ------------------------------------------------ + + async def run_once( + self, + *, + target_date: str | None = None, + dry_run: bool = False, + now_ts: int | None = None, + ) -> dict: + """Build + (maybe) send the digest. + + target_date: 'YYYY-MM-DD' in the configured tz. Defaults to today. + dry_run: skip SMTP send, still record nothing in digest_runs. + now_ts: override "now" for tests; defaults to time.time(). + + Returns a summary: + { + "date": "YYYY-MM-DD", + "dry_run": bool, + "projects": [ + {"project": str, "recipients": [...], "job_count": int, + "sent": bool, "skipped_reason": str|None} + ], + "text": "", + "html": "", + "sections": [...], + } + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self._run_once_sync(target_date=target_date, dry_run=dry_run, now_ts=now_ts), + ) + + def _run_once_sync( + self, + *, + target_date: str | None, + dry_run: bool, + now_ts: int | None, + ) -> dict: + if now_ts is None: + now_ts = int(time.time()) + + if target_date is None: + date_str = datetime.fromtimestamp(now_ts, self.tz).strftime("%Y-%m-%d") + else: + date_str = target_date + + # Window: last 24h ending at "now" (or end of target_date if backfilling). + window_end = now_ts + if target_date is not None: + # Anchor to local-midnight + 24h of the target date so backfills + # are reproducible. + day = datetime.strptime(target_date, "%Y-%m-%d").replace(tzinfo=self.tz) + window_end = int((day + timedelta(days=1)).timestamp()) + window_start = window_end - 24 * 3600 + + # Pull all projects + their last-24h jobs + findings. + projects = self.db.list_projects() + per_project_sections: list[dict] = [] + per_project_meta: list[dict] = [] + full_log_url = f"{self.full_log_base_url}/{date_str}" + + for prow in projects: + recipe = json.loads(prow.get("recipe_json") or "{}") + notify = recipe.get("notify") or {} + recipients: list[str] = list(notify.get("email") or []) + notify_on: list[str] = list(notify.get("on") or []) + + jobs = self.db.list_jobs(project_name=prow["name"], limit=500) + # Window filter — finished_at if present, else queued_at. + in_window = [] + for j in jobs: + ts = j.get("finished_at") or j.get("queued_at") or 0 + if window_start <= ts <= window_end: + in_window.append(j) + with_findings: list[tuple[dict, list[dict]]] = [ + (j, self.db.list_findings(j["id"])) for j in in_window + ] + filtered = _filter_for_project(with_findings, notify_on) + + section_runs: list[dict] = [] + cves = 0 + for job, findings in filtered: + section_runs.append({ + "subproject": job["subproject_path"], + "recipe": job["recipe"], + "status": "pass" if job["status"] == "succeeded" else + ("fail" if job["status"] in ("failed", "timed_out", "cancelled") else "warn"), + "summary": _summarize_job(job, findings), + "glyph": _outcome_glyph(job, findings), + }) + cves += sum(1 for f in findings if f.get("kind") == "cve") + + section = { + "project": prow["name"], + "runs": section_runs, + "cves": cves, + } + + meta = { + "project": prow["name"], + "recipients": recipients, + "job_count": len(filtered), + "sent": False, + "skipped_reason": None, + } + + # Decide whether to include this project's section in the body. + # "include" decisions: + # - no recipients -> never include or send + # - recipients + activity -> include + send + # - recipients + no activity + nightly_summary -> include "no activity" + send + # - recipients + no activity + no nightly_summary -> skip silently + wants_summary = "nightly_summary" in notify_on + if not recipients: + meta["skipped_reason"] = "no_recipients" + per_project_meta.append(meta) + continue + if not section_runs and not wants_summary: + meta["skipped_reason"] = "zero_activity" + per_project_meta.append(meta) + continue + + per_project_sections.append(section) + per_project_meta.append(meta) + + text_body = _render_text(date_str, per_project_sections, full_log_url) + html_body = _render_html(date_str, per_project_sections, full_log_url) + + # Per-project send loop. Idempotency check via digest_runs UNIQUE. + for meta, section in zip( + [m for m in per_project_meta if m["skipped_reason"] is None], + per_project_sections, + ): + project_name = meta["project"] + if not dry_run: + already = self.db.digest_run_exists(date_str, project_name) + if already: + meta["skipped_reason"] = "already_sent" + continue + + # Build a per-project-scoped body. + proj_text = _render_text(date_str, [section], full_log_url) + proj_html = _render_html(date_str, [section], full_log_url) + subject = ( + f"crafting-table digest — {date_str} " + f"({len(section['runs'])} runs, 0 patches drafted, {section['cves']} CVEs)" + ) + + if dry_run or self.smtp is None: + meta["sent"] = False + if self.smtp is None: + meta["skipped_reason"] = "smtp_disabled" + continue + + try: + self._send_email( + recipients=meta["recipients"], + subject=subject, + text=proj_text, + html=proj_html, + ) + meta["sent"] = True + self.db.record_digest_run( + date=date_str, + project_name=project_name, + sent_at=int(time.time()), + recipient_count=len(meta["recipients"]), + job_count=meta["job_count"], + ) + except Exception as e: + log.exception("digest SMTP send failed for %s: %s", project_name, e) + meta["sent"] = False + meta["skipped_reason"] = f"smtp_error:{type(e).__name__}" + + return { + "date": date_str, + "dry_run": dry_run, + "projects": per_project_meta, + "text": text_body, + "html": html_body, + "sections": per_project_sections, + "full_log_url": full_log_url, + } + + # ---------- SMTP send -------------------------------------------------- + + def _send_email( + self, + *, + recipients: list[str], + subject: str, + text: str, + html: str, + ) -> None: + """Send one MIME multipart email to the given list of recipients. + + We send one message with a To: header containing all recipients, then + loop sendmail per address so each recipient gets a personal copy + (so the test's assert-once-per-recipient pattern is meaningful, and + so partial-failures don't poison the whole batch). + """ + if self.smtp is None: + raise RuntimeError("smtp not configured") + + with smtplib.SMTP(self.smtp.host, self.smtp.port, timeout=30) as conn: + if self.smtp.use_tls: + try: + conn.starttls() + except smtplib.SMTPNotSupportedError: + log.warning("SMTP server does not support STARTTLS; sending cleartext") + if self.smtp.username and self.smtp.password: + conn.login(self.smtp.username, self.smtp.password) + for addr in recipients: + msg = EmailMessage() + msg["Subject"] = subject + msg["From"] = self.smtp.from_addr + msg["To"] = addr + msg.set_content(text) + msg.add_alternative(html, subtype="html") + conn.send_message(msg) diff --git a/crafting_table/models.py b/crafting_table/models.py index 48b45c4..431ed96 100644 --- a/crafting_table/models.py +++ b/crafting_table/models.py @@ -42,6 +42,21 @@ class Schedule(BaseModel): class Notify(BaseModel): + """Per-project email digest preferences. + + `email` — recipient list. Empty list silently excludes the project from + every digest. + + `on` — event filter for which job outcomes show up in this project's + digest section. Recognized tags (see crafting_table.digest): + audit_pass / audit_fail / test_fail / lint_warn / cve_found / + patch_drafted (wave 3) / nightly_summary (show all) + Empty list defaults to `["audit_fail", "cve_found", "patch_drafted"]` + at digest-render time. + + `auto_patch` — wave 3 / step 9 toggle for the autonomous patch loop. + """ + email: list[str] = Field(default_factory=list) on: list[str] = Field(default_factory=lambda: ["audit_fail", "cve_found", "patch_drafted"]) auto_patch: bool = False diff --git a/crafting_table/server.py b/crafting_table/server.py index 602ce90..81115f2 100644 --- a/crafting_table/server.py +++ b/crafting_table/server.py @@ -42,6 +42,7 @@ from fastapi.responses import FileResponse, JSONResponse from .auth import Auth, AppToken from .config import Config, load from .db import DB +from .digest import DigestScheduler, SmtpConfig from .models import ( CreateJobRequest, Project, @@ -71,6 +72,10 @@ runner: Runner = Runner( gc_interval_secs=cfg.workspace_gc_interval_secs, gc_age_secs=cfg.workspace_gc_age_secs, ) +# SMTP-driven; if CRAFTING_SMTP_HOST is unset the scheduler stays None and the +# lifespan logs a "digest disabled" warning. Endpoints still render in dry-run. +_smtp_cfg: SmtpConfig | None = SmtpConfig.from_env() +digest_scheduler: DigestScheduler = DigestScheduler(db=db, smtp=_smtp_cfg) # ---------- lifespan -------------------------------------------------------- @@ -80,6 +85,14 @@ runner: Runner = Runner( async def _lifespan(app: FastAPI): auth.bootstrap_admin(cfg.admin_bearer_path) await runner.start() + if _smtp_cfg is None: + log.warning("digest disabled — CRAFTING_SMTP_HOST not set") + else: + await digest_scheduler.start() + log.info( + "digest scheduler started: smtp=%s:%d from=%s", + _smtp_cfg.host, _smtp_cfg.port, _smtp_cfg.from_addr, + ) log.info( "crafting-table startup: db=%s log_dir=%s max_concurrent=%d port=%d", cfg.db_path, cfg.log_dir, cfg.max_concurrent_jobs, cfg.api_port, @@ -87,6 +100,7 @@ async def _lifespan(app: FastAPI): try: yield finally: + await digest_scheduler.stop() await runner.stop() log.info("crafting-table shutdown complete") @@ -459,6 +473,43 @@ async def get_job_findings( return {"ok": True, "findings": findings} +# ---- /digests -------------------------------------------------------------- + + +@app.get("/digests/{date}") +async def get_digest( + date: str, + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + """Render the digest body for a given date (YYYY-MM-DD in PT) without + sending. Admin token required. Wave 2C: this is a dry-run rendering pass + so the email's "Full log:" link resolves; future waves can persist + rendered bodies to a blob store and pull them here for archive replay. + """ + auth.require_admin(request, authorization) + # basic shape validation + if len(date) != 10 or date[4] != "-" or date[7] != "-": + raise HTTPException(400, "date must be YYYY-MM-DD") + summary = await digest_scheduler.run_once(target_date=date, dry_run=True) + return {"ok": True, "digest": summary} + + +@app.post("/admin/digest/run-now") +async def admin_digest_run_now( + request: Request, + authorization: Annotated[str | None, Header()] = None, + body: dict | None = None, +): + """Manual digest trigger. Admin only. body={"dry_run": true} skips SMTP.""" + auth.require_admin(request, authorization) + body = body or {} + dry_run = bool(body.get("dry_run", False)) + target_date = body.get("date") + summary = await digest_scheduler.run_once(target_date=target_date, dry_run=dry_run) + return {"ok": True, "digest": summary} + + # ---------- helpers --------------------------------------------------------- diff --git a/tests/test_db.py b/tests/test_db.py index c310327..528f794 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -9,12 +9,13 @@ import pytest def test_migrations_applied(db_only): versions = db_only.applied_migrations() - # All 5 migrations from the MIGRATIONS list should land on first boot. + # All MIGRATIONS entries should land on first boot. assert "001_schema_migrations" in versions assert "002_tokens" in versions assert "003_projects" in versions assert "004_jobs" in versions assert "005_findings" in versions + assert "006_digest_runs" in versions def test_migrations_idempotent(db_only): @@ -33,7 +34,7 @@ def test_schema_has_required_tables(db_only): "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" ).fetchall() names = {r["name"] for r in rows} - for required in ("schema_migrations", "tokens", "projects", "jobs", "findings"): + for required in ("schema_migrations", "tokens", "projects", "jobs", "findings", "digest_runs"): assert required in names, f"missing table: {required}" diff --git a/tests/test_digest.py b/tests/test_digest.py new file mode 100644 index 0000000..5c2a5f1 --- /dev/null +++ b/tests/test_digest.py @@ -0,0 +1,352 @@ +"""Email digest scheduler tests. + +Cover: +- 24h aggregation across multiple projects +- notify.on event filtering +- skipping projects with no notify.email +- SMTP send is invoked once per recipient (mocked smtplib.SMTP) +- zero-state behavior (skipped unless nightly_summary requested) +- Idempotency via digest_runs UNIQUE constraint +- POST /admin/digest/run-now endpoint + +We don't boot the full server unless an HTTP-level surface is being exercised. +For the unit-style tests we instantiate DigestScheduler directly against a +fresh DB. +""" +from __future__ import annotations + +import json +import time +from unittest import mock + +import pytest + +from tests.conftest import sample_project_payload + + +# ---------- helpers -------------------------------------------------------- + + +def _seed_project(db, *, name: str, owner_token: str = "owner-x", + email: list[str] | None = None, + notify_on: list[str] | None = None) -> None: + if email is None: + email = [] + if notify_on is None: + notify_on = [] + db.insert_token(name=owner_token, bearer=f"t-{owner_token}-{name}", + is_admin=False, ip_cidrs=None) + recipe = { + "languages": ["python"], + "subprojects": [{"path": ".", "language": "python"}], + "schedule": {}, + "notify": {"email": email, "on": notify_on, "auto_patch": False}, + } + db.upsert_project( + name=name, git_url="g", default_branch="main", + recipe_json=json.dumps(recipe), owner_token=owner_token, + ) + + +def _seed_job(db, *, project_name: str, job_id: str, + recipe: str = "audit", status: str = "succeeded", + hours_ago: float = 1.0, exit_code: int = 0, + subproject_path: str = ".", + findings: list[dict] | None = None) -> None: + """Insert a finished job N hours ago. Optional findings list adds + structured findings. tmp log_path is a string only — we don't write.""" + db.insert_job( + job_id=job_id, project_name=project_name, + subproject_path=subproject_path, recipe=recipe, branch="main", + log_path=f"/tmp/{job_id}.log", + recipe_snapshot_json="{}", + ) + # Backdate queued_at + finished_at by editing the row directly. + finished = int(time.time() - hours_ago * 3600) + queued = finished - 60 + started = finished - 30 + with db._conn() as c: + c.execute( + "UPDATE jobs SET queued_at=?, started_at=?, finished_at=?, status=?, exit_code=? WHERE id=?", + (queued, started, finished, status, exit_code, job_id), + ) + for f in findings or []: + db.insert_finding( + job_id=job_id, + kind=f.get("kind", "lint"), + severity=f.get("severity", "warn"), + file=f.get("file"), + line=f.get("line"), + code=f.get("code"), + message=f.get("message", "x"), + fingerprint=f.get("fingerprint", f"fp-{job_id}-{f.get('kind','lint')}"), + suggested_fix=f.get("suggested_fix"), + ) + + +# ---------- direct-call tests --------------------------------------------- + + +@pytest.mark.asyncio +async def test_digest_aggregates_jobs_in_window(db_only): + from crafting_table.digest import DigestScheduler, SmtpConfig + + _seed_project(db_only, name="alpha", owner_token="oa", + email=["cobb@sulkta.com"], notify_on=["nightly_summary"]) + _seed_project(db_only, name="beta", owner_token="ob", + email=["cobb@sulkta.com"], notify_on=["nightly_summary"]) + + _seed_job(db_only, project_name="alpha", job_id="a-pass", + recipe="audit", status="succeeded", hours_ago=2) + _seed_job(db_only, project_name="alpha", job_id="a-fail", + recipe="audit", status="failed", hours_ago=3, exit_code=1) + _seed_job(db_only, project_name="beta", job_id="b-warn", + recipe="lint", status="succeeded", hours_ago=4, + findings=[{"kind": "lint", "severity": "warn", "message": "ruff warning"}]) + # Out-of-window: 36h ago should not appear + _seed_job(db_only, project_name="alpha", job_id="a-old", + recipe="audit", status="succeeded", hours_ago=36) + + smtp = SmtpConfig(host="localhost", port=25, use_tls=False) + sched = DigestScheduler(db=db_only, smtp=smtp) + out = await sched.run_once(dry_run=True) + + text = out["text"] + assert "alpha::." in text + assert "beta::." in text + assert "a-old" not in text # out-of-window job's id won't render anyway + + # alpha has 2 in-window runs (pass + fail), beta has 1 lint warn + alpha_section = next(s for s in out["sections"] if s["project"] == "alpha") + beta_section = next(s for s in out["sections"] if s["project"] == "beta") + assert len(alpha_section["runs"]) == 2 + assert len(beta_section["runs"]) == 1 + # Pass + fail glyphs both appear + glyphs = {r["glyph"] for r in alpha_section["runs"]} + assert "✓" in glyphs and "✗" in glyphs + + +@pytest.mark.asyncio +async def test_digest_filters_by_notify_on(db_only): + """Project with notify.on=['audit_fail'] should ONLY see failed audits.""" + from crafting_table.digest import DigestScheduler, SmtpConfig + + _seed_project(db_only, name="filterproj", owner_token="o", + email=["x@example.com"], notify_on=["audit_fail"]) + _seed_job(db_only, project_name="filterproj", job_id="fp-pass", + recipe="audit", status="succeeded", hours_ago=1) + _seed_job(db_only, project_name="filterproj", job_id="fp-fail", + recipe="audit", status="failed", hours_ago=2, exit_code=1) + _seed_job(db_only, project_name="filterproj", job_id="fp-lint", + recipe="lint", status="succeeded", hours_ago=2, + findings=[{"kind": "lint", "severity": "warn", "message": "w"}]) + + smtp = SmtpConfig(host="localhost", port=25, use_tls=False) + sched = DigestScheduler(db=db_only, smtp=smtp) + out = await sched.run_once(dry_run=True) + + sections = [s for s in out["sections"] if s["project"] == "filterproj"] + assert len(sections) == 1 + runs = sections[0]["runs"] + assert len(runs) == 1 + assert runs[0]["recipe"] == "audit" + assert runs[0]["status"] == "fail" + + +@pytest.mark.asyncio +async def test_digest_skips_projects_with_no_email(db_only): + """Projects without notify.email must NOT appear in any sent body.""" + from crafting_table.digest import DigestScheduler, SmtpConfig + + _seed_project(db_only, name="silent", owner_token="o1", + email=[], notify_on=["nightly_summary"]) + _seed_project(db_only, name="loud", owner_token="o2", + email=["x@example.com"], notify_on=["nightly_summary"]) + _seed_job(db_only, project_name="silent", job_id="s1", + recipe="audit", status="succeeded", hours_ago=1) + _seed_job(db_only, project_name="loud", job_id="l1", + recipe="audit", status="succeeded", hours_ago=1) + + smtp = SmtpConfig(host="localhost", port=25, use_tls=False) + sched = DigestScheduler(db=db_only, smtp=smtp) + out = await sched.run_once(dry_run=True) + + sections_by_proj = {s["project"] for s in out["sections"]} + assert "loud" in sections_by_proj + assert "silent" not in sections_by_proj + + # Meta record reflects skip reason for the silent project + silent_meta = next(m for m in out["projects"] if m["project"] == "silent") + assert silent_meta["skipped_reason"] == "no_recipients" + + +@pytest.mark.asyncio +async def test_digest_smtp_send_via_mock(db_only): + """Patching smtplib.SMTP at the module level — confirms send_message is + called once per recipient and the digest_runs row is recorded.""" + from crafting_table import digest as digest_mod + from crafting_table.digest import DigestScheduler, SmtpConfig + + _seed_project(db_only, name="mailme", owner_token="o", + email=["a@example.com", "b@example.com"], + notify_on=["nightly_summary"]) + _seed_job(db_only, project_name="mailme", job_id="m1", + recipe="audit", status="succeeded", hours_ago=2) + + smtp = SmtpConfig(host="localhost", port=2525, use_tls=False, + from_addr="ct@localhost") + sched = DigestScheduler(db=db_only, smtp=smtp) + + fake_conn = mock.MagicMock() + fake_conn.__enter__.return_value = fake_conn + fake_conn.__exit__.return_value = False + + with mock.patch.object(digest_mod.smtplib, "SMTP", return_value=fake_conn) as smtp_cls: + out = await sched.run_once(dry_run=False) + + assert smtp_cls.called, "smtplib.SMTP was never instantiated" + # send_message called once per recipient + assert fake_conn.send_message.call_count == 2 + + # digest_runs has the idempotency row + runs = db_only.list_digest_runs() + assert len(runs) == 1 + assert runs[0]["project_name"] == "mailme" + assert runs[0]["recipient_count"] == 2 + assert runs[0]["job_count"] == 1 + + mailme_meta = next(m for m in out["projects"] if m["project"] == "mailme") + assert mailme_meta["sent"] is True + + +@pytest.mark.asyncio +async def test_digest_zero_state(db_only): + """No jobs in window: + - project with nightly_summary -> body still rendered + sent + - project without nightly_summary -> skipped silently (zero_activity) + """ + from crafting_table import digest as digest_mod + from crafting_table.digest import DigestScheduler, SmtpConfig + + _seed_project(db_only, name="quiet-nightly", owner_token="o1", + email=["x@example.com"], notify_on=["nightly_summary"]) + _seed_project(db_only, name="quiet-default", owner_token="o2", + email=["x@example.com"], notify_on=[]) + + smtp = SmtpConfig(host="localhost", port=25, use_tls=False) + sched = DigestScheduler(db=db_only, smtp=smtp) + + fake_conn = mock.MagicMock() + fake_conn.__enter__.return_value = fake_conn + fake_conn.__exit__.return_value = False + with mock.patch.object(digest_mod.smtplib, "SMTP", return_value=fake_conn): + out = await sched.run_once(dry_run=False) + + metas = {m["project"]: m for m in out["projects"]} + assert metas["quiet-default"]["skipped_reason"] == "zero_activity" + assert metas["quiet-default"]["sent"] is False + + # nightly_summary project still got a send (one recipient) + assert metas["quiet-nightly"]["sent"] is True + assert fake_conn.send_message.call_count == 1 + + +@pytest.mark.asyncio +async def test_digest_idempotent(db_only): + """Calling run_once twice for the same date should send ONE email per + project, not two — second call hits the digest_runs UNIQUE.""" + from crafting_table import digest as digest_mod + from crafting_table.digest import DigestScheduler, SmtpConfig + + _seed_project(db_only, name="idem", owner_token="o", + email=["x@example.com"], notify_on=["nightly_summary"]) + _seed_job(db_only, project_name="idem", job_id="i1", + recipe="audit", status="succeeded", hours_ago=1) + + smtp = SmtpConfig(host="localhost", port=25, use_tls=False) + sched = DigestScheduler(db=db_only, smtp=smtp) + + fake_conn = mock.MagicMock() + fake_conn.__enter__.return_value = fake_conn + fake_conn.__exit__.return_value = False + + with mock.patch.object(digest_mod.smtplib, "SMTP", return_value=fake_conn): + await sched.run_once(dry_run=False) + await sched.run_once(dry_run=False) + + # Only one send despite two run_once calls + assert fake_conn.send_message.call_count == 1 + runs = db_only.list_digest_runs() + assert len(runs) == 1 + + +# ---------- HTTP-level test ------------------------------------------------ + + +def test_admin_digest_run_now_endpoint(client): + """POST /admin/digest/run-now with dry_run=True returns the digest body + and never touches SMTP.""" + tc, ctx = client + + # Register a project under alpha with email + nightly_summary + payload = sample_project_payload(name="ep") + payload["notify"] = { + "email": ["cobb@sulkta.com"], + "on": ["nightly_summary"], + "auto_patch": False, + } + r = tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=payload, + ) + assert r.status_code == 200, r.text + + # Admin POST /admin/digest/run-now + r2 = tc.post( + "/admin/digest/run-now", + headers={"Authorization": f"Bearer {ctx['admin_bearer']}"}, + json={"dry_run": True}, + ) + assert r2.status_code == 200, r2.text + body = r2.json() + assert body["ok"] is True + assert "digest" in body + assert body["digest"]["dry_run"] is True + assert "text" in body["digest"] + assert "html" in body["digest"] + + # Non-admin shouldn't be able to call this + r3 = tc.post( + "/admin/digest/run-now", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"dry_run": True}, + ) + assert r3.status_code == 403 + + +def test_get_digest_endpoint_admin_only(client): + """GET /digests/{date} requires admin and returns rendered body.""" + tc, ctx = client + + r = tc.get( + "/digests/2026-04-29", + headers={"Authorization": f"Bearer {ctx['admin_bearer']}"}, + ) + assert r.status_code == 200, r.text + body = r.json() + assert body["ok"] is True + assert body["digest"]["date"] == "2026-04-29" + assert body["digest"]["dry_run"] is True + + r2 = tc.get( + "/digests/not-a-date", + headers={"Authorization": f"Bearer {ctx['admin_bearer']}"}, + ) + assert r2.status_code == 400 + + r3 = tc.get( + "/digests/2026-04-29", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r3.status_code == 403