crafting-table/tests/test_digest.py
Cobb Hayes b335405c02 Public-flip audit: generalize internal hosts/paths + drop Sulkta-internal refs
URLs, mount paths, and LAN host bindings parameterized via env or relative paths
so the repo stands up from a clean clone anywhere. Drop cross-codebase refs
("mirrors clawdforge's pattern"), Sulkta-Coop client/merchant test fixtures,
and audit-changelog scaffolding from comments. README terser, technical content
preserved.
2026-05-27 11:25:47 -07:00

352 lines
13 KiB
Python

"""Email digest scheduler tests.
Cover:
- 24h aggregation across multiple projects
- notify.on event filtering
- skipping projects with no notify.email
- SMTP send is invoked once per recipient (mocked smtplib.SMTP)
- zero-state behavior (skipped unless nightly_summary requested)
- Idempotency via digest_runs UNIQUE constraint
- POST /admin/digest/run-now endpoint
We don't boot the full server unless an HTTP-level surface is being exercised.
For the unit-style tests we instantiate DigestScheduler directly against a
fresh DB.
"""
from __future__ import annotations
import json
import time
from unittest import mock
import pytest
from tests.conftest import sample_project_payload
# ---------- helpers --------------------------------------------------------
def _seed_project(db, *, name: str, owner_token: str = "owner-x",
email: list[str] | None = None,
notify_on: list[str] | None = None) -> None:
if email is None:
email = []
if notify_on is None:
notify_on = []
db.insert_token(name=owner_token, bearer=f"t-{owner_token}-{name}",
is_admin=False, ip_cidrs=None)
recipe = {
"languages": ["python"],
"subprojects": [{"path": ".", "language": "python"}],
"schedule": {},
"notify": {"email": email, "on": notify_on, "auto_patch": False},
}
db.upsert_project(
name=name, git_url="g", default_branch="main",
recipe_json=json.dumps(recipe), owner_token=owner_token,
)
def _seed_job(db, *, project_name: str, job_id: str,
recipe: str = "audit", status: str = "succeeded",
hours_ago: float = 1.0, exit_code: int = 0,
subproject_path: str = ".",
findings: list[dict] | None = None) -> None:
"""Insert a finished job N hours ago. Optional findings list adds
structured findings. tmp log_path is a string only — we don't write."""
db.insert_job(
job_id=job_id, project_name=project_name,
subproject_path=subproject_path, recipe=recipe, branch="main",
log_path=f"/tmp/{job_id}.log",
recipe_snapshot_json="{}",
)
# Backdate queued_at + finished_at by editing the row directly.
finished = int(time.time() - hours_ago * 3600)
queued = finished - 60
started = finished - 30
with db._conn() as c:
c.execute(
"UPDATE jobs SET queued_at=?, started_at=?, finished_at=?, status=?, exit_code=? WHERE id=?",
(queued, started, finished, status, exit_code, job_id),
)
for f in findings or []:
db.insert_finding(
job_id=job_id,
kind=f.get("kind", "lint"),
severity=f.get("severity", "warn"),
file=f.get("file"),
line=f.get("line"),
code=f.get("code"),
message=f.get("message", "x"),
fingerprint=f.get("fingerprint", f"fp-{job_id}-{f.get('kind','lint')}"),
suggested_fix=f.get("suggested_fix"),
)
# ---------- direct-call tests ---------------------------------------------
@pytest.mark.asyncio
async def test_digest_aggregates_jobs_in_window(db_only):
from crafting_table.digest import DigestScheduler, SmtpConfig
_seed_project(db_only, name="alpha", owner_token="oa",
email=["alerts@example.com"], notify_on=["nightly_summary"])
_seed_project(db_only, name="beta", owner_token="ob",
email=["alerts@example.com"], notify_on=["nightly_summary"])
_seed_job(db_only, project_name="alpha", job_id="a-pass",
recipe="audit", status="succeeded", hours_ago=2)
_seed_job(db_only, project_name="alpha", job_id="a-fail",
recipe="audit", status="failed", hours_ago=3, exit_code=1)
_seed_job(db_only, project_name="beta", job_id="b-warn",
recipe="lint", status="succeeded", hours_ago=4,
findings=[{"kind": "lint", "severity": "warn", "message": "ruff warning"}])
# Out-of-window: 36h ago should not appear
_seed_job(db_only, project_name="alpha", job_id="a-old",
recipe="audit", status="succeeded", hours_ago=36)
smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
sched = DigestScheduler(db=db_only, smtp=smtp)
out = await sched.run_once(dry_run=True)
text = out["text"]
assert "alpha::." in text
assert "beta::." in text
assert "a-old" not in text # out-of-window job's id won't render anyway
# alpha has 2 in-window runs (pass + fail), beta has 1 lint warn
alpha_section = next(s for s in out["sections"] if s["project"] == "alpha")
beta_section = next(s for s in out["sections"] if s["project"] == "beta")
assert len(alpha_section["runs"]) == 2
assert len(beta_section["runs"]) == 1
# Pass + fail glyphs both appear
glyphs = {r["glyph"] for r in alpha_section["runs"]}
assert "" in glyphs and "" in glyphs
@pytest.mark.asyncio
async def test_digest_filters_by_notify_on(db_only):
"""Project with notify.on=['audit_fail'] should ONLY see failed audits."""
from crafting_table.digest import DigestScheduler, SmtpConfig
_seed_project(db_only, name="filterproj", owner_token="o",
email=["x@example.com"], notify_on=["audit_fail"])
_seed_job(db_only, project_name="filterproj", job_id="fp-pass",
recipe="audit", status="succeeded", hours_ago=1)
_seed_job(db_only, project_name="filterproj", job_id="fp-fail",
recipe="audit", status="failed", hours_ago=2, exit_code=1)
_seed_job(db_only, project_name="filterproj", job_id="fp-lint",
recipe="lint", status="succeeded", hours_ago=2,
findings=[{"kind": "lint", "severity": "warn", "message": "w"}])
smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
sched = DigestScheduler(db=db_only, smtp=smtp)
out = await sched.run_once(dry_run=True)
sections = [s for s in out["sections"] if s["project"] == "filterproj"]
assert len(sections) == 1
runs = sections[0]["runs"]
assert len(runs) == 1
assert runs[0]["recipe"] == "audit"
assert runs[0]["status"] == "fail"
@pytest.mark.asyncio
async def test_digest_skips_projects_with_no_email(db_only):
"""Projects without notify.email must NOT appear in any sent body."""
from crafting_table.digest import DigestScheduler, SmtpConfig
_seed_project(db_only, name="silent", owner_token="o1",
email=[], notify_on=["nightly_summary"])
_seed_project(db_only, name="loud", owner_token="o2",
email=["x@example.com"], notify_on=["nightly_summary"])
_seed_job(db_only, project_name="silent", job_id="s1",
recipe="audit", status="succeeded", hours_ago=1)
_seed_job(db_only, project_name="loud", job_id="l1",
recipe="audit", status="succeeded", hours_ago=1)
smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
sched = DigestScheduler(db=db_only, smtp=smtp)
out = await sched.run_once(dry_run=True)
sections_by_proj = {s["project"] for s in out["sections"]}
assert "loud" in sections_by_proj
assert "silent" not in sections_by_proj
# Meta record reflects skip reason for the silent project
silent_meta = next(m for m in out["projects"] if m["project"] == "silent")
assert silent_meta["skipped_reason"] == "no_recipients"
@pytest.mark.asyncio
async def test_digest_smtp_send_via_mock(db_only):
"""Patching smtplib.SMTP at the module level — confirms send_message is
called once per recipient and the digest_runs row is recorded."""
from crafting_table import digest as digest_mod
from crafting_table.digest import DigestScheduler, SmtpConfig
_seed_project(db_only, name="mailme", owner_token="o",
email=["a@example.com", "b@example.com"],
notify_on=["nightly_summary"])
_seed_job(db_only, project_name="mailme", job_id="m1",
recipe="audit", status="succeeded", hours_ago=2)
smtp = SmtpConfig(host="localhost", port=2525, use_tls=False,
from_addr="ct@localhost")
sched = DigestScheduler(db=db_only, smtp=smtp)
fake_conn = mock.MagicMock()
fake_conn.__enter__.return_value = fake_conn
fake_conn.__exit__.return_value = False
with mock.patch.object(digest_mod.smtplib, "SMTP", return_value=fake_conn) as smtp_cls:
out = await sched.run_once(dry_run=False)
assert smtp_cls.called, "smtplib.SMTP was never instantiated"
# send_message called once per recipient
assert fake_conn.send_message.call_count == 2
# digest_runs has the idempotency row
runs = db_only.list_digest_runs()
assert len(runs) == 1
assert runs[0]["project_name"] == "mailme"
assert runs[0]["recipient_count"] == 2
assert runs[0]["job_count"] == 1
mailme_meta = next(m for m in out["projects"] if m["project"] == "mailme")
assert mailme_meta["sent"] is True
@pytest.mark.asyncio
async def test_digest_zero_state(db_only):
"""No jobs in window:
- project with nightly_summary -> body still rendered + sent
- project without nightly_summary -> skipped silently (zero_activity)
"""
from crafting_table import digest as digest_mod
from crafting_table.digest import DigestScheduler, SmtpConfig
_seed_project(db_only, name="quiet-nightly", owner_token="o1",
email=["x@example.com"], notify_on=["nightly_summary"])
_seed_project(db_only, name="quiet-default", owner_token="o2",
email=["x@example.com"], notify_on=[])
smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
sched = DigestScheduler(db=db_only, smtp=smtp)
fake_conn = mock.MagicMock()
fake_conn.__enter__.return_value = fake_conn
fake_conn.__exit__.return_value = False
with mock.patch.object(digest_mod.smtplib, "SMTP", return_value=fake_conn):
out = await sched.run_once(dry_run=False)
metas = {m["project"]: m for m in out["projects"]}
assert metas["quiet-default"]["skipped_reason"] == "zero_activity"
assert metas["quiet-default"]["sent"] is False
# nightly_summary project still got a send (one recipient)
assert metas["quiet-nightly"]["sent"] is True
assert fake_conn.send_message.call_count == 1
@pytest.mark.asyncio
async def test_digest_idempotent(db_only):
"""Calling run_once twice for the same date should send ONE email per
project, not two — second call hits the digest_runs UNIQUE."""
from crafting_table import digest as digest_mod
from crafting_table.digest import DigestScheduler, SmtpConfig
_seed_project(db_only, name="idem", owner_token="o",
email=["x@example.com"], notify_on=["nightly_summary"])
_seed_job(db_only, project_name="idem", job_id="i1",
recipe="audit", status="succeeded", hours_ago=1)
smtp = SmtpConfig(host="localhost", port=25, use_tls=False)
sched = DigestScheduler(db=db_only, smtp=smtp)
fake_conn = mock.MagicMock()
fake_conn.__enter__.return_value = fake_conn
fake_conn.__exit__.return_value = False
with mock.patch.object(digest_mod.smtplib, "SMTP", return_value=fake_conn):
await sched.run_once(dry_run=False)
await sched.run_once(dry_run=False)
# Only one send despite two run_once calls
assert fake_conn.send_message.call_count == 1
runs = db_only.list_digest_runs()
assert len(runs) == 1
# ---------- HTTP-level test ------------------------------------------------
def test_admin_digest_run_now_endpoint(client):
"""POST /admin/digest/run-now with dry_run=True returns the digest body
and never touches SMTP."""
tc, ctx = client
# Register a project under alpha with email + nightly_summary
payload = sample_project_payload(name="ep")
payload["notify"] = {
"email": ["alerts@example.com"],
"on": ["nightly_summary"],
"auto_patch": False,
}
r = tc.post(
"/projects",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
json=payload,
)
assert r.status_code == 200, r.text
# Admin POST /admin/digest/run-now
r2 = tc.post(
"/admin/digest/run-now",
headers={"Authorization": f"Bearer {ctx['admin_bearer']}"},
json={"dry_run": True},
)
assert r2.status_code == 200, r2.text
body = r2.json()
assert body["ok"] is True
assert "digest" in body
assert body["digest"]["dry_run"] is True
assert "text" in body["digest"]
assert "html" in body["digest"]
# Non-admin shouldn't be able to call this
r3 = tc.post(
"/admin/digest/run-now",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
json={"dry_run": True},
)
assert r3.status_code == 403
def test_get_digest_endpoint_admin_only(client):
"""GET /digests/{date} requires admin and returns rendered body."""
tc, ctx = client
r = tc.get(
"/digests/2026-04-29",
headers={"Authorization": f"Bearer {ctx['admin_bearer']}"},
)
assert r.status_code == 200, r.text
body = r.json()
assert body["ok"] is True
assert body["digest"]["date"] == "2026-04-29"
assert body["digest"]["dry_run"] is True
r2 = tc.get(
"/digests/not-a-date",
headers={"Authorization": f"Bearer {ctx['admin_bearer']}"},
)
assert r2.status_code == 400
r3 = tc.get(
"/digests/2026-04-29",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
)
assert r3.status_code == 403