"""DB-level tests — schema, migrations, round-trips.""" from __future__ import annotations import json import time import pytest def test_migrations_applied(db_only): versions = db_only.applied_migrations() # All MIGRATIONS entries should land on first boot. assert "001_schema_migrations" in versions assert "002_tokens" in versions assert "003_projects" in versions assert "004_jobs" in versions assert "005_findings" in versions assert "006_digest_runs" in versions def test_migrations_idempotent(db_only): """Re-running migrate() is a no-op once applied — the INSERT OR IGNORE on schema_migrations is the test we care about here.""" before = set(db_only.applied_migrations()) second_pass = db_only.migrate() assert second_pass == [] # nothing applied after = set(db_only.applied_migrations()) assert before == after def test_schema_has_required_tables(db_only): with db_only._conn() as c: rows = c.execute( "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" ).fetchall() names = {r["name"] for r in rows} for required in ("schema_migrations", "tokens", "projects", "jobs", "findings", "digest_runs"): assert required in names, f"missing table: {required}" def test_token_round_trip(db_only): db_only.insert_token( name="alpha", bearer="ct_test_alpha", is_admin=False, ip_cidrs=["10.0.0.0/8"], ) rec = db_only.lookup_token_by_bearer("ct_test_alpha") assert rec is not None assert rec["name"] == "alpha" assert rec["is_admin"] is False assert rec["ip_cidrs"] == ["10.0.0.0/8"] def test_token_revoke(db_only): db_only.insert_token(name="bravo", bearer="ct_b", is_admin=False, ip_cidrs=None) assert db_only.lookup_token_by_bearer("ct_b") is not None assert db_only.revoke_token("bravo") is True assert db_only.lookup_token_by_bearer("ct_b") is None def test_project_upsert_and_get(db_only): db_only.insert_token(name="owner1", bearer="t1", is_admin=False, ip_cidrs=None) row = db_only.upsert_project( name="proj-a", git_url="https://x.example/repo.git", default_branch="main", recipe_json='{"languages":["python"],"subprojects":[]}', owner_token="owner1", ) assert row["name"] == "proj-a" fetched = db_only.get_project("proj-a") assert fetched is not None assert fetched["git_url"] == "https://x.example/repo.git" assert fetched["owner_token"] == "owner1" def test_project_update_keeps_created_at(db_only): db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) a = db_only.upsert_project( name="p", git_url="g1", default_branch="main", recipe_json="{}", owner_token="o", ) time.sleep(1.1) # so updated_at moves b = db_only.upsert_project( name="p", git_url="g2", default_branch="trunk", recipe_json="{}", owner_token="o", ) assert b["created_at"] == a["created_at"] assert b["updated_at"] >= a["updated_at"] assert b["git_url"] == "g2" assert b["default_branch"] == "trunk" def test_project_list_filters_by_owner(db_only): db_only.insert_token(name="o1", bearer="ta", is_admin=False, ip_cidrs=None) db_only.insert_token(name="o2", bearer="tb", is_admin=False, ip_cidrs=None) db_only.upsert_project(name="p1", git_url="g", default_branch="main", recipe_json="{}", owner_token="o1") db_only.upsert_project(name="p2", git_url="g", default_branch="main", recipe_json="{}", owner_token="o2") only_o1 = db_only.list_projects(owner_token="o1") assert {p["name"] for p in only_o1} == {"p1"} everyone = db_only.list_projects() assert {p["name"] for p in everyone} == {"p1", "p2"} def test_project_delete_cascades_jobs_and_findings(db_only): db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) db_only.upsert_project(name="p", git_url="g", default_branch="main", recipe_json="{}", owner_token="o") db_only.insert_job( job_id="j1", project_name="p", subproject_path=".", recipe="test", branch="main", log_path="/tmp/x.log", recipe_snapshot_json="{}", ) db_only.insert_finding( job_id="j1", kind="lint", severity="warn", message="m", fingerprint="fp1", ) assert db_only.get_job("j1") is not None assert len(db_only.list_findings("j1")) == 1 db_only.delete_project("p") # FK ON DELETE CASCADE should kill the job and via second cascade the finding assert db_only.get_job("j1") is None assert db_only.list_findings("j1") == [] def test_job_round_trip(db_only): db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) db_only.upsert_project(name="p", git_url="g", default_branch="main", recipe_json="{}", owner_token="o") row = db_only.insert_job( job_id="j2", project_name="p", subproject_path="clients/rust", recipe="audit", branch="main", log_path="/tmp/y.log", recipe_snapshot_json=json.dumps({"subprojects": [{"path": "clients/rust"}]}), ) assert row["status"] == "queued" db_only.mark_job_running("j2") j = db_only.get_job("j2") assert j["status"] == "running" assert j["started_at"] is not None db_only.mark_job_finished(job_id="j2", status="succeeded", exit_code=0) j2 = db_only.get_job("j2") assert j2["status"] == "succeeded" assert j2["exit_code"] == 0 assert j2["finished_at"] is not None def test_orphaned_running_jobs_marked_failed_on_boot(db_only, tmp_path): db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) db_only.upsert_project(name="p", git_url="g", default_branch="main", recipe_json="{}", owner_token="o") db_only.insert_job( job_id="orphan", project_name="p", subproject_path=".", recipe="audit", branch="main", log_path=str(tmp_path / "orphan.log"), recipe_snapshot_json="{}", ) db_only.mark_job_running("orphan") ids = db_only.mark_orphaned_jobs_failed(log_dir=tmp_path) assert "orphan" in ids j = db_only.get_job("orphan") assert j["status"] == "failed" assert j["exit_code"] == -1 # The synthetic log line should have landed in the orphan's log file log_text = (tmp_path / "orphan.log").read_text() assert "runner restart" in log_text def test_finding_round_trip(db_only): db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) db_only.upsert_project(name="p", git_url="g", default_branch="main", recipe_json="{}", owner_token="o") db_only.insert_job( job_id="jx", project_name="p", subproject_path=".", recipe="lint", branch="main", log_path="/tmp/z.log", recipe_snapshot_json="{}", ) db_only.insert_finding( job_id="jx", kind="lint", severity="warn", file="src/foo.py", line=12, code="E501", message="line too long", fingerprint="fp-1", ) findings = db_only.list_findings("jx") assert len(findings) == 1 assert findings[0]["file"] == "src/foo.py" assert findings[0]["line"] == 12 assert findings[0]["fingerprint"] == "fp-1" def test_jobs_filter_by_owner(db_only): db_only.insert_token(name="oa", bearer="ta", is_admin=False, ip_cidrs=None) db_only.insert_token(name="ob", bearer="tb", is_admin=False, ip_cidrs=None) db_only.upsert_project(name="pa", git_url="g", default_branch="main", recipe_json="{}", owner_token="oa") db_only.upsert_project(name="pb", git_url="g", default_branch="main", recipe_json="{}", owner_token="ob") db_only.insert_job(job_id="ja", project_name="pa", subproject_path=".", recipe="test", branch="main", log_path="/tmp/a.log", recipe_snapshot_json="{}") db_only.insert_job(job_id="jb", project_name="pb", subproject_path=".", recipe="test", branch="main", log_path="/tmp/b.log", recipe_snapshot_json="{}") only_oa = db_only.list_jobs(owner_token="oa") assert {j["id"] for j in only_oa} == {"ja"} only_running = db_only.list_jobs(status="running") assert only_running == []