From 0ec3a04676931ea5db576e126c70f235fef208d1 Mon Sep 17 00:00:00 2001 From: Kayos Date: Wed, 29 Apr 2026 08:17:41 -0700 Subject: [PATCH] v0.1 wave 1 (steps 2+3+4): SQLite ledger + FastAPI skeleton + async job runner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - db.py: migrations + DAOs for tokens / projects / jobs / findings (SQLite WAL) - auth.py: SHA-256 bearer hashing + LAN-CIDR allowlist + admin/app token tiers - models.py: Pydantic shapes (Project, Subproject, Schedule, Notify, Job, CreateJobRequest) - server.py: FastAPI on port 8810; /healthz, /admin/tokens/*, /projects/*, /jobs, /jobs/{id}, /jobs/{id}/log, /jobs/{id}/findings - runner.py: bounded asyncio pool, per-job timeout with process-group SIGTERM→SIGKILL escalation, orphaned-job recovery on boot - workspace.py: bare-clone + worktree materialization, gc - config.py: env-driven - 62 tests across db / auth / projects / jobs / runner / e2e — all green Cross-token project access returns 404 (not 403) — existence-leak guard. Bearer tokens hashed at rest; admin token bootstrapped on first boot. Recipe subprocess uses start_new_session=True so killpg targets the whole process tree on timeout — child processes can't escape SIGKILL. Pump task guarded with wait_for(2s) + cancel fallback against any orphan that survives the group kill. Wave 2 (parsers + findings extraction + MCP + email digest) pending. Spec: memory/spec-crafting-table.md --- .env.example | 33 +++ .gitignore | 1 + crafting_table/__init__.py | 7 + crafting_table/auth.py | 153 ++++++++++ crafting_table/config.py | 59 ++++ crafting_table/db.py | 502 +++++++++++++++++++++++++++++++++ crafting_table/models.py | 111 ++++++++ crafting_table/runner.py | 408 +++++++++++++++++++++++++++ crafting_table/server.py | 484 +++++++++++++++++++++++++++++++ crafting_table/workspace.py | 195 +++++++++++++ pyproject.toml | 40 +++ requirements.txt | 3 + tests/__init__.py | 0 tests/conftest.py | 159 +++++++++++ tests/test_auth.py | 122 ++++++++ tests/test_db.py | 214 ++++++++++++++ tests/test_e2e_register_run.py | 122 ++++++++ tests/test_jobs_api.py | 248 ++++++++++++++++ tests/test_projects_api.py | 235 +++++++++++++++ tests/test_runner.py | 232 +++++++++++++++ 20 files changed, 3328 insertions(+) create mode 100644 .env.example create mode 100644 crafting_table/__init__.py create mode 100644 crafting_table/auth.py create mode 100644 crafting_table/config.py create mode 100644 crafting_table/db.py create mode 100644 crafting_table/models.py create mode 100644 crafting_table/runner.py create mode 100644 crafting_table/server.py create mode 100644 crafting_table/workspace.py create mode 100644 pyproject.toml create mode 100644 requirements.txt create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_auth.py create mode 100644 tests/test_db.py create mode 100644 tests/test_e2e_register_run.py create mode 100644 tests/test_jobs_api.py create mode 100644 tests/test_projects_api.py create mode 100644 tests/test_runner.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..e813cee --- /dev/null +++ b/.env.example @@ -0,0 +1,33 @@ +# crafting-table runtime config — every key is optional; defaults shown. +# +# Copy to `.env` and edit, or pass each key explicitly to `docker compose`. + +# SQLite ledger location (created on first boot) +CRAFTING_DB=/data/crafting.db + +# Where workspaces (bare clones + per-job worktrees) live +CRAFTING_WORKSPACE=/workspace + +# Per-job log files: /data/jobs/.log +CRAFTING_LOG_DIR=/data/jobs + +# Admin bearer plaintext is written here on first boot, chmod 600 +CRAFTING_ADMIN_BEARER=/data/admin-bearer.txt + +# Bounded asyncio pool size — how many recipes can run concurrently +CRAFTING_MAX_CONCURRENT=4 + +# HTTP listen socket +CRAFTING_PORT=8810 +CRAFTING_BIND=0.0.0.0 + +# Default per-job timeout in seconds (recipes can override via timeout_secs) +CRAFTING_DEFAULT_JOB_TIMEOUT=1800 + +# Override the default LAN allowlist if you want stricter scoping. +# Default: 10/8, 172.16/12, 192.168/16, 127/8, ::1/128 +# CRAFTING_LAN_CIDRS=192.168.0.0/16,127.0.0.0/8 + +# Workspace gc — how often to sweep for stale worktrees, and the age cutoff. +CRAFTING_GC_INTERVAL=3600 +CRAFTING_GC_AGE=86400 diff --git a/.gitignore b/.gitignore index 5d44c72..fd92415 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,4 @@ obj/ .vscode/ .idea/ *.swp +*.egg-info/ diff --git a/crafting_table/__init__.py b/crafting_table/__init__.py new file mode 100644 index 0000000..b0569b1 --- /dev/null +++ b/crafting_table/__init__.py @@ -0,0 +1,7 @@ +"""crafting-table — polyglot dev/build/audit container. + +Wave 1 (steps 2+3+4): SQLite ledger + FastAPI skeleton + async job runner. +Spec: memory/spec-crafting-table.md +""" + +__version__ = "0.1.0" diff --git a/crafting_table/auth.py b/crafting_table/auth.py new file mode 100644 index 0000000..75f6cc6 --- /dev/null +++ b/crafting_table/auth.py @@ -0,0 +1,153 @@ +"""Bearer + IP allowlist authentication. + +Mirrors clawdforge's pattern: +- Bearer tokens hashed at rest (SHA-256). No plaintext stored. +- Per-token IP allowlist (CIDR list). NULL means "any RFC1918 + loopback" + via the global LAN allowlist. +- Admin tokens are flagged in the tokens table — server-side admin checks + query `is_admin` rather than comparing to a bootstrap string. +- Loopback always allowed (test client uses 127.0.0.1; FastAPI's + `request.client.host` returns 'testclient' under TestClient and we patch + that in tests). +- Bearer tokens NEVER appear in error messages or log lines. Same hygiene + as clawdforge. +""" +from __future__ import annotations + +import ipaddress +import logging +import secrets +from dataclasses import dataclass +from pathlib import Path + +from fastapi import HTTPException, Request + +from .db import DB + + +log = logging.getLogger("crafting_table.auth") + +ADMIN_TOKEN_NAME = "admin" +ADMIN_TOKEN_PREFIX = "ct_" + + +@dataclass +class AppToken: + name: str + is_admin: bool + ip_cidrs: list[str] | None # None = use global LAN allowlist + + +def _client_ip(request: Request) -> str: + """Extract the client IP from a request. Tests monkeypatch this.""" + return request.client.host if request.client else "0.0.0.0" + + +def _ip_in_any(ip_str: str, cidrs: list[str]) -> bool: + try: + ip = ipaddress.ip_address(ip_str) + except ValueError: + return False + if ip.is_loopback: + return True + for cidr in cidrs: + try: + if ip in ipaddress.ip_network(cidr, strict=False): + return True + except ValueError: + continue + return False + + +def _const_eq(a: str, b: str) -> bool: + if len(a) != len(b): + return False + diff = 0 + for x, y in zip(a.encode(), b.encode()): + diff |= x ^ y + return diff == 0 + + +class Auth: + """Holds DB ref + global LAN CIDRs. Construct once at startup.""" + + def __init__(self, *, db: DB, lan_cidrs: list[str] | tuple[str, ...]): + self.db = db + self.lan_cidrs = list(lan_cidrs) + + # ---------- bootstrap --------------------------------------------------- + + def bootstrap_admin(self, admin_bearer_path: Path) -> str: + """Mint admin token if none exists, write plaintext bearer to disk + (chmod 600). Subsequent boots reuse the existing token. + + Returns the path-side bearer (read from disk) — not necessarily what + we just minted, since another process may have raced us. + """ + admin_bearer_path = Path(admin_bearer_path) + admin_bearer_path.parent.mkdir(parents=True, exist_ok=True) + existing = self.db.get_token(ADMIN_TOKEN_NAME) + + if existing is not None and admin_bearer_path.exists(): + return admin_bearer_path.read_text(encoding="utf-8").strip() + + if existing is not None: + # Token row exists but the file is gone — we cannot recover the + # plaintext (it was hashed at insert). Revoke and re-mint. + log.warning("admin token row exists but bearer file is missing; rotating") + self.db.revoke_token(ADMIN_TOKEN_NAME) + # Renaming the existing row would be cleaner, but revoke + new + # row keeps the audit trail of past admin tokens. + new_name = f"{ADMIN_TOKEN_NAME}-rotated-{int(__import__('time').time())}" + with self.db._conn() as c: + c.execute("UPDATE tokens SET name=? WHERE name=?", (new_name, ADMIN_TOKEN_NAME)) + + bearer = ADMIN_TOKEN_PREFIX + secrets.token_urlsafe(32) + self.db.insert_token( + name=ADMIN_TOKEN_NAME, + bearer=bearer, + is_admin=True, + ip_cidrs=None, + ) + admin_bearer_path.write_text(bearer + "\n", encoding="utf-8") + admin_bearer_path.chmod(0o600) + log.info("admin bearer written to %s (chmod 600)", admin_bearer_path) + return bearer + + # ---------- guards ------------------------------------------------------ + + def require_global_ip(self, request: Request) -> None: + ip = _client_ip(request) + if not _ip_in_any(ip, self.lan_cidrs): + raise HTTPException(403, f"ip not in LAN allowlist: {ip}") + + def require_app(self, request: Request, authorization: str | None) -> AppToken: + """Returns AppToken on success. Raises 401/403 on failure. + + We check the global LAN allowlist FIRST (cheap, doesn't touch DB) so + wide-area scanners don't even cause a token lookup. + """ + self.require_global_ip(request) + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(401, "missing bearer") + bearer = authorization[7:].strip() + if not bearer: + raise HTTPException(401, "empty bearer") + rec = self.db.lookup_token_by_bearer(bearer) + if rec is None: + # Note: do NOT echo the bearer back. Generic message. + raise HTTPException(403, "unknown or revoked token") + + # Per-token IP allowlist takes precedence over global LAN if set. + if rec["ip_cidrs"]: + ip = _client_ip(request) + if not _ip_in_any(ip, rec["ip_cidrs"]): + raise HTTPException(403, f"ip not in app allowlist: {ip}") + + return AppToken(name=rec["name"], is_admin=rec["is_admin"], ip_cidrs=rec["ip_cidrs"]) + + def require_admin(self, request: Request, authorization: str | None) -> AppToken: + tok = self.require_app(request, authorization) + if not tok.is_admin: + raise HTTPException(403, "admin auth failed") + return tok diff --git a/crafting_table/config.py b/crafting_table/config.py new file mode 100644 index 0000000..2e73774 --- /dev/null +++ b/crafting_table/config.py @@ -0,0 +1,59 @@ +"""Env-driven configuration. + +All settings flow through environment variables so the same image runs in +prod (compose.yml env_file) and tests (monkeypatched envs). No config files. +""" +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from pathlib import Path + + +# Default LAN allowlist mirrors the rules baked into the network: anything +# inside RFC1918 plus loopback. Override with CRAFTING_LAN_CIDRS if a deploy +# wants stricter scoping. +DEFAULT_LAN_CIDRS = ( + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "127.0.0.0/8", + "::1/128", +) + + +@dataclass(frozen=True) +class Config: + db_path: Path + workspace_root: Path + log_dir: Path + admin_bearer_path: Path + max_concurrent_jobs: int + api_port: int + api_bind: str + default_job_timeout_secs: int + lan_cidrs: tuple[str, ...] + workspace_gc_interval_secs: int + workspace_gc_age_secs: int + + +def load() -> Config: + cidrs_raw = os.environ.get("CRAFTING_LAN_CIDRS", "").strip() + if cidrs_raw: + cidrs = tuple(c.strip() for c in cidrs_raw.split(",") if c.strip()) + else: + cidrs = DEFAULT_LAN_CIDRS + + return Config( + db_path=Path(os.environ.get("CRAFTING_DB", "/data/crafting.db")), + workspace_root=Path(os.environ.get("CRAFTING_WORKSPACE", "/workspace")), + log_dir=Path(os.environ.get("CRAFTING_LOG_DIR", "/data/jobs")), + admin_bearer_path=Path(os.environ.get("CRAFTING_ADMIN_BEARER", "/data/admin-bearer.txt")), + max_concurrent_jobs=int(os.environ.get("CRAFTING_MAX_CONCURRENT", "4")), + api_port=int(os.environ.get("CRAFTING_PORT", "8810")), + api_bind=os.environ.get("CRAFTING_BIND", "0.0.0.0"), + default_job_timeout_secs=int(os.environ.get("CRAFTING_DEFAULT_JOB_TIMEOUT", "1800")), + lan_cidrs=cidrs, + workspace_gc_interval_secs=int(os.environ.get("CRAFTING_GC_INTERVAL", "3600")), + workspace_gc_age_secs=int(os.environ.get("CRAFTING_GC_AGE", "86400")), + ) diff --git a/crafting_table/db.py b/crafting_table/db.py new file mode 100644 index 0000000..8676155 --- /dev/null +++ b/crafting_table/db.py @@ -0,0 +1,502 @@ +"""SQLite ledger + migrations. + +Why SQLite (not MariaDB like clawdforge): single-process, single-host service, +no need for cross-host replication. The runner is the only writer; every +HTTP worker reads. SQLite in WAL mode handles single-writer-many-readers +cleanly. Trade-off documented in README. + +Why stdlib `sqlite3` + `run_in_executor` (not aiosqlite): one less dependency +and the queries are tiny (fetchone / fetchall). The runner does its own log +streaming via aiofiles-equivalent so we never block the loop on disk. + +Migration system: +- Each entry in MIGRATIONS is (version_id, sql_text). Versions are date-tagged + so they sort lexicographically. +- Apply in order, INSERT OR IGNORE into schema_migrations to handle + multi-worker boot races (mirrors cauldron's pattern). +- Migrations are append-only; never edit a landed migration, add a new one. +""" +from __future__ import annotations + +import asyncio +import json +import sqlite3 +import time +from contextlib import contextmanager +from pathlib import Path + + +# fmt: off +MIGRATIONS: list[tuple[str, str]] = [ + ( + "001_schema_migrations", + """ + CREATE TABLE IF NOT EXISTS schema_migrations ( + version TEXT PRIMARY KEY, + applied_at INTEGER NOT NULL + ); + """, + ), + ( + "002_tokens", + """ + CREATE TABLE IF NOT EXISTS tokens ( + name TEXT PRIMARY KEY, + bearer_hash TEXT NOT NULL UNIQUE, + is_admin INTEGER NOT NULL DEFAULT 0, + ip_allowlist_json TEXT, + created_at INTEGER NOT NULL, + last_used_at INTEGER, + revoked_at INTEGER + ); + """, + ), + ( + "003_projects", + """ + CREATE TABLE IF NOT EXISTS projects ( + name TEXT PRIMARY KEY, + git_url TEXT NOT NULL, + default_branch TEXT NOT NULL DEFAULT 'main', + recipe_json TEXT NOT NULL, + owner_token TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY (owner_token) REFERENCES tokens(name) + ); + """, + ), + ( + "004_jobs", + """ + CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + project_name TEXT NOT NULL, + subproject_path TEXT NOT NULL, + recipe TEXT NOT NULL, + branch TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'queued', + queued_at INTEGER NOT NULL, + started_at INTEGER, + finished_at INTEGER, + exit_code INTEGER, + log_path TEXT NOT NULL, + findings_count INTEGER NOT NULL DEFAULT 0, + recipe_snapshot_json TEXT NOT NULL, + FOREIGN KEY (project_name) REFERENCES projects(name) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_jobs_project ON jobs(project_name); + CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status); + CREATE INDEX IF NOT EXISTS idx_jobs_queued_at ON jobs(queued_at); + """, + ), + ( + "005_findings", + """ + CREATE TABLE IF NOT EXISTS findings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id TEXT NOT NULL, + kind TEXT NOT NULL, + severity TEXT NOT NULL, + file TEXT, + line INTEGER, + code TEXT, + message TEXT NOT NULL, + suggested_fix TEXT, + raw_json TEXT, + created_at INTEGER NOT NULL, + fingerprint TEXT NOT NULL, + FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_findings_job ON findings(job_id); + CREATE INDEX IF NOT EXISTS idx_findings_fingerprint ON findings(fingerprint); + """, + ), +] +# fmt: on + + +def _hash(token: str) -> str: + import hashlib + + return hashlib.sha256(token.encode("utf-8")).hexdigest() + + +class DB: + """Synchronous SQLite wrapper. Async API methods wrap calls with + run_in_executor so callers in the FastAPI loop stay non-blocking. + """ + + def __init__(self, db_path: str | Path): + self.db_path = str(db_path) + Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) + with self._conn() as c: + # WAL = many readers + one writer without lock contention. + # synchronous=NORMAL is the standard WAL pairing — durability + # against process crash is preserved; only OS crash can drop + # the most recent commit. + c.execute("PRAGMA journal_mode=WAL") + c.execute("PRAGMA synchronous=NORMAL") + c.execute("PRAGMA foreign_keys=ON") + self.migrate() + + @contextmanager + def _conn(self): + # isolation_level=None gives us autocommit; we wrap multi-stmt + # operations in BEGIN / COMMIT explicitly when we need atomicity. + conn = sqlite3.connect(self.db_path, isolation_level=None, timeout=10.0) + conn.row_factory = sqlite3.Row + try: + conn.execute("PRAGMA foreign_keys=ON") + yield conn + finally: + conn.close() + + # ---------- migrations --------------------------------------------------- + + def migrate(self) -> list[str]: + """Apply any pending migrations. Returns the list of versions applied + on this call (empty if up-to-date). Idempotent + race-safe.""" + applied: list[str] = [] + with self._conn() as c: + # Migration 001 must run first since it creates the tracking table. + # Use IF NOT EXISTS in every CREATE so repeat runs are no-ops. + for version, sql in MIGRATIONS: + c.executescript(sql) + cur = c.execute( + "INSERT OR IGNORE INTO schema_migrations (version, applied_at) VALUES (?, ?)", + (version, int(time.time())), + ) + if cur.rowcount == 1: + applied.append(version) + return applied + + def applied_migrations(self) -> list[str]: + with self._conn() as c: + rows = c.execute( + "SELECT version FROM schema_migrations ORDER BY version" + ).fetchall() + return [r["version"] for r in rows] + + # ---------- tokens ------------------------------------------------------- + + def insert_token( + self, + *, + name: str, + bearer: str, + is_admin: bool, + ip_cidrs: list[str] | None, + ) -> None: + ip_json = json.dumps(ip_cidrs) if ip_cidrs else None + with self._conn() as c: + c.execute( + """ + INSERT INTO tokens (name, bearer_hash, is_admin, ip_allowlist_json, created_at) + VALUES (?, ?, ?, ?, ?) + """, + (name, _hash(bearer), 1 if is_admin else 0, ip_json, int(time.time())), + ) + + def lookup_token_by_bearer(self, bearer: str) -> dict | None: + h = _hash(bearer) + with self._conn() as c: + row = c.execute( + """ + SELECT name, is_admin, ip_allowlist_json, revoked_at + FROM tokens WHERE bearer_hash=? + """, + (h,), + ).fetchone() + if not row: + return None + if row["revoked_at"] is not None: + return None + c.execute("UPDATE tokens SET last_used_at=? WHERE bearer_hash=?", (int(time.time()), h)) + ip_cidrs = json.loads(row["ip_allowlist_json"]) if row["ip_allowlist_json"] else None + return { + "name": row["name"], + "is_admin": bool(row["is_admin"]), + "ip_cidrs": ip_cidrs, + } + + def get_token(self, name: str) -> dict | None: + with self._conn() as c: + row = c.execute( + """ + SELECT name, is_admin, ip_allowlist_json, created_at, last_used_at, revoked_at + FROM tokens WHERE name=? + """, + (name,), + ).fetchone() + if not row: + return None + d = dict(row) + d["is_admin"] = bool(d["is_admin"]) + d["ip_cidrs"] = json.loads(d.pop("ip_allowlist_json")) if d["ip_allowlist_json"] else None + return d + + def list_tokens(self) -> list[dict]: + with self._conn() as c: + rows = c.execute( + """ + SELECT name, is_admin, ip_allowlist_json, created_at, last_used_at, revoked_at + FROM tokens ORDER BY name + """, + ).fetchall() + out = [] + for r in rows: + d = dict(r) + d["is_admin"] = bool(d["is_admin"]) + d["ip_cidrs"] = json.loads(d.pop("ip_allowlist_json")) if d["ip_allowlist_json"] else None + out.append(d) + return out + + def revoke_token(self, name: str) -> bool: + with self._conn() as c: + cur = c.execute( + "UPDATE tokens SET revoked_at=? WHERE name=? AND revoked_at IS NULL", + (int(time.time()), name), + ) + return cur.rowcount > 0 + + # ---------- projects ----------------------------------------------------- + + def upsert_project( + self, + *, + name: str, + git_url: str, + default_branch: str, + recipe_json: str, + owner_token: str, + ) -> dict: + now = int(time.time()) + with self._conn() as c: + row = c.execute("SELECT created_at, owner_token FROM projects WHERE name=?", (name,)).fetchone() + if row is None: + c.execute( + """ + INSERT INTO projects (name, git_url, default_branch, recipe_json, owner_token, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + (name, git_url, default_branch, recipe_json, owner_token, now, now), + ) + created_at = now + else: + created_at = row["created_at"] + c.execute( + """ + UPDATE projects + SET git_url=?, default_branch=?, recipe_json=?, updated_at=? + WHERE name=? + """, + (git_url, default_branch, recipe_json, now, name), + ) + return { + "name": name, + "git_url": git_url, + "default_branch": default_branch, + "recipe_json": recipe_json, + "owner_token": owner_token, + "created_at": created_at, + "updated_at": now, + } + + def get_project(self, name: str) -> dict | None: + with self._conn() as c: + row = c.execute( + "SELECT * FROM projects WHERE name=?", + (name,), + ).fetchone() + return dict(row) if row else None + + def list_projects(self, *, owner_token: str | None = None) -> list[dict]: + with self._conn() as c: + if owner_token is None: + rows = c.execute("SELECT * FROM projects ORDER BY name").fetchall() + else: + rows = c.execute( + "SELECT * FROM projects WHERE owner_token=? ORDER BY name", + (owner_token,), + ).fetchall() + return [dict(r) for r in rows] + + def delete_project(self, name: str) -> bool: + with self._conn() as c: + cur = c.execute("DELETE FROM projects WHERE name=?", (name,)) + return cur.rowcount > 0 + + # ---------- jobs --------------------------------------------------------- + + def insert_job( + self, + *, + job_id: str, + project_name: str, + subproject_path: str, + recipe: str, + branch: str, + log_path: str, + recipe_snapshot_json: str, + ) -> dict: + now = int(time.time()) + with self._conn() as c: + c.execute( + """ + INSERT INTO jobs (id, project_name, subproject_path, recipe, branch, + status, queued_at, log_path, recipe_snapshot_json) + VALUES (?, ?, ?, ?, ?, 'queued', ?, ?, ?) + """, + (job_id, project_name, subproject_path, recipe, branch, now, log_path, recipe_snapshot_json), + ) + return { + "id": job_id, + "project_name": project_name, + "subproject_path": subproject_path, + "recipe": recipe, + "branch": branch, + "status": "queued", + "queued_at": now, + "started_at": None, + "finished_at": None, + "exit_code": None, + "log_path": log_path, + "findings_count": 0, + } + + def get_job(self, job_id: str) -> dict | None: + with self._conn() as c: + row = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone() + return dict(row) if row else None + + def list_jobs( + self, + *, + project_name: str | None = None, + status: str | None = None, + owner_token: str | None = None, + limit: int = 50, + ) -> list[dict]: + sql = """ + SELECT j.* FROM jobs j + JOIN projects p ON p.name = j.project_name + WHERE 1=1 + """ + params: list = [] + if project_name is not None: + sql += " AND j.project_name=?" + params.append(project_name) + if status is not None: + sql += " AND j.status=?" + params.append(status) + if owner_token is not None: + sql += " AND p.owner_token=?" + params.append(owner_token) + sql += " ORDER BY j.queued_at DESC LIMIT ?" + params.append(int(limit)) + with self._conn() as c: + rows = c.execute(sql, params).fetchall() + return [dict(r) for r in rows] + + def mark_job_running(self, job_id: str) -> None: + with self._conn() as c: + c.execute( + "UPDATE jobs SET status='running', started_at=? WHERE id=? AND status='queued'", + (int(time.time()), job_id), + ) + + def mark_job_finished( + self, + *, + job_id: str, + status: str, + exit_code: int | None, + ) -> None: + with self._conn() as c: + c.execute( + """ + UPDATE jobs + SET status=?, finished_at=?, exit_code=? + WHERE id=? + """, + (status, int(time.time()), exit_code, job_id), + ) + + def mark_orphaned_jobs_failed(self, *, log_dir: Path | None = None) -> list[str]: + """Sweep on boot — any job in 'running' state was orphaned by a process + crash. Mark them failed with exit_code=-1 so callers see the terminal + state and can re-queue if they want.""" + ids: list[str] = [] + now = int(time.time()) + with self._conn() as c: + rows = c.execute("SELECT id, log_path FROM jobs WHERE status='running'").fetchall() + for r in rows: + ids.append(r["id"]) + c.execute( + "UPDATE jobs SET status='failed', finished_at=?, exit_code=-1 WHERE id=?", + (now, r["id"]), + ) + if log_dir is not None: + try: + Path(r["log_path"]).parent.mkdir(parents=True, exist_ok=True) + with open(r["log_path"], "a", encoding="utf-8") as fh: + fh.write("\n[crafting-table] runner restart, job orphaned\n") + except OSError: + pass + return ids + + def increment_findings_count(self, job_id: str, n: int) -> None: + with self._conn() as c: + c.execute( + "UPDATE jobs SET findings_count = findings_count + ? WHERE id=?", + (n, job_id), + ) + + # ---------- findings ----------------------------------------------------- + + def insert_finding( + self, + *, + job_id: str, + kind: str, + severity: str, + message: str, + fingerprint: str, + file: str | None = None, + line: int | None = None, + code: str | None = None, + suggested_fix: str | None = None, + raw_json: str | None = None, + ) -> int: + with self._conn() as c: + cur = c.execute( + """ + INSERT INTO findings (job_id, kind, severity, file, line, code, message, + suggested_fix, raw_json, created_at, fingerprint) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + (job_id, kind, severity, file, line, code, message, + suggested_fix, raw_json, int(time.time()), fingerprint), + ) + return cur.lastrowid + + def list_findings(self, job_id: str) -> list[dict]: + with self._conn() as c: + rows = c.execute( + "SELECT * FROM findings WHERE job_id=? ORDER BY id", + (job_id,), + ).fetchall() + return [dict(r) for r in rows] + + # ---------- async wrappers ---------------------------------------------- + + async def arun(self, fn, *args, **kwargs): + """Run a sync DB method in the default executor. + + Use from FastAPI request handlers / runner coroutines so we don't + block the event loop on disk I/O. Most of these queries are <1ms but + the pattern stays consistent for when we add bigger ones later. + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, lambda: fn(*args, **kwargs)) diff --git a/crafting_table/models.py b/crafting_table/models.py new file mode 100644 index 0000000..48b45c4 --- /dev/null +++ b/crafting_table/models.py @@ -0,0 +1,111 @@ +"""Pydantic schemas for projects, recipes, jobs, findings. + +All wire shapes — what HTTP request bodies look like and what the API returns. +The DB stores Project minus the name (which is the row PK) as recipe_json so +recipe drift is visible per-job (jobs snapshot their recipe at run-time). +""" +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + + +# Slug pattern shared between project names and token names — lowercase +# alphanumerics + hyphen + underscore, must start with alphanumeric. +SLUG_PATTERN = r"^[a-z0-9][a-z0-9_-]*$" + + +class Subproject(BaseModel): + """One language target inside a repo. A project has one or more.""" + + path: str = "." + language: str + build: str | None = None + test: str | None = None + lint: str | None = None + audit: str | None = None + timeout_secs: int = Field(default=1800, ge=1, le=86400) + + +class Schedule(BaseModel): + """Cron-style schedules per recipe kind. 'manual' = caller-driven only. + + Wave 1 doesn't run the scheduler yet — these strings are persisted but the + sweeper that consumes them lands in a later wave. Stored as-is. + """ + + audit: str | None = None + test: str | None = None + build: str | None = None + lint: str | None = None + + +class Notify(BaseModel): + email: list[str] = Field(default_factory=list) + on: list[str] = Field(default_factory=lambda: ["audit_fail", "cve_found", "patch_drafted"]) + auto_patch: bool = False + + +class Project(BaseModel): + """Full project shape — what the API accepts on POST /projects. + + `created_at` and `updated_at` are server-stamped on insert/update; if the + caller supplies them we ignore the values and use server time. + """ + + name: str = Field(pattern=SLUG_PATTERN, min_length=1, max_length=64) + git_url: str = Field(min_length=1) + default_branch: str = "main" + languages: list[str] = Field(default_factory=list) + subprojects: list[Subproject] = Field(default_factory=list) + schedule: Schedule = Field(default_factory=Schedule) + notify: Notify = Field(default_factory=Notify) + created_at: int = 0 + updated_at: int = 0 + + +class CreateJobRequest(BaseModel): + recipe: Literal["build", "test", "lint", "audit"] + subproject: str | None = None + branch: str | None = None + + +class Job(BaseModel): + """API view of a job row.""" + + id: str + project_name: str + subproject_path: str + recipe: str + branch: str + status: Literal["queued", "running", "succeeded", "failed", "timed_out", "cancelled"] + queued_at: int + started_at: int | None = None + finished_at: int | None = None + exit_code: int | None = None + log_path: str + findings_count: int = 0 + + +class TokenCreateRequest(BaseModel): + name: str = Field(pattern=SLUG_PATTERN, min_length=1, max_length=64) + is_admin: bool = False + ip_cidrs: list[str] = Field(default_factory=list) + + +class Finding(BaseModel): + """One structured finding from a parser. Wave 1 ships the schema; wave 2 + actually populates these from cargo/clippy/ruff/etc. JSON output.""" + + id: int + job_id: str + kind: str + severity: str + file: str | None = None + line: int | None = None + code: str | None = None + message: str + suggested_fix: str | None = None + fingerprint: str + created_at: int diff --git a/crafting_table/runner.py b/crafting_table/runner.py new file mode 100644 index 0000000..4ad2914 --- /dev/null +++ b/crafting_table/runner.py @@ -0,0 +1,408 @@ +"""Async job runner — bounded asyncio pool that materializes workspaces and +runs recipe shell commands. + +Lifecycle: +1. server.lifespan calls `runner.start()`: + - mark any 'running' jobs from a previous process as failed (orphaned) + - kick off the dispatcher loop + workspace gc loop +2. POST /projects//jobs: + - inserts a row in `jobs` (status=queued) + - calls `runner.enqueue(job_id)` — fast, just puts the id on a queue +3. dispatcher pulls ids off the queue, acquires a semaphore slot, + spawns `_run_job` — bounded by `max_concurrent`. +4. _run_job: + a. mark running + b. materialize workspace + c. exec recipe via /bin/sh -c + d. stream stdout+stderr to log file (live) + e. enforce per-job timeout + f. mark terminal status + exit code + g. emit a `jobs_finished` event hook for wave-2 parsers / wave-8 digest +5. server.lifespan stop() drains in-flight tasks then closes. + +Concurrency: asyncio.Semaphore(max_concurrent) caps in-flight subprocess +runs. The queue itself is unbounded — back-pressure is enforced by the +semaphore + caller can poll job status to know when to enqueue more. + +Recipe security: shell strings are run via `create_subprocess_shell` (which +uses /bin/sh -c). Admins set them; this is documented loud in README. +""" +from __future__ import annotations + +import asyncio +import json +import logging +import os +import signal +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Awaitable, Callable + +from .db import DB +from .workspace import WorkspaceManager, WorkspacePaths + + +log = logging.getLogger("crafting_table.runner") + + +# Hook signature: called after every job reaches a terminal state. +# Wave 2 wires this to the parser pipeline; wave 8 to the email digest queue. +JobFinishedHook = Callable[[dict], Awaitable[None]] + + +@dataclass +class _JobContext: + job_id: str + job: dict + project: dict + recipe: dict + subproject: dict + + +class Runner: + def __init__( + self, + *, + db: DB, + workspace: WorkspaceManager, + log_dir: Path, + max_concurrent: int = 4, + default_timeout_secs: int = 1800, + gc_interval_secs: int = 3600, + gc_age_secs: int = 86400, + ): + self.db = db + self.workspace = workspace + self.log_dir = Path(log_dir) + self.log_dir.mkdir(parents=True, exist_ok=True) + self.max_concurrent = max_concurrent + self.default_timeout_secs = default_timeout_secs + self.gc_interval_secs = gc_interval_secs + self.gc_age_secs = gc_age_secs + + self.queue: asyncio.Queue[str] = asyncio.Queue() + self.semaphore = asyncio.Semaphore(max_concurrent) + self._tasks: set[asyncio.Task] = set() + self._dispatcher_task: asyncio.Task | None = None + self._gc_task: asyncio.Task | None = None + self._stopping = False + self._hooks: list[JobFinishedHook] = [] + + # Test introspection — lets test_runner assert on bounded-concurrency. + self.in_flight = 0 + self.peak_in_flight = 0 + + # ---------- lifecycle --------------------------------------------------- + + def add_hook(self, hook: JobFinishedHook) -> None: + self._hooks.append(hook) + + async def start(self) -> None: + # Recover orphaned 'running' jobs from a previous process — mark them + # failed with exit_code=-1 and a synthetic log line. We do NOT try to + # resume a job mid-execution; recipe state could be partial. + orphaned = await self.db.arun( + self.db.mark_orphaned_jobs_failed, log_dir=self.log_dir + ) + if orphaned: + log.warning("marked %d orphaned running job(s) failed: %s", len(orphaned), orphaned) + + self._dispatcher_task = asyncio.create_task(self._dispatch_loop()) + self._gc_task = asyncio.create_task(self._gc_loop()) + + async def stop(self) -> None: + self._stopping = True + if self._dispatcher_task is not None: + self._dispatcher_task.cancel() + try: + await self._dispatcher_task + except asyncio.CancelledError: + pass + if self._gc_task is not None: + self._gc_task.cancel() + try: + await self._gc_task + except asyncio.CancelledError: + pass + # Cancel any in-flight job tasks. Recipes will see SIGTERM via the + # asyncio cancellation chain on the subprocess transport. + for t in list(self._tasks): + t.cancel() + for t in list(self._tasks): + try: + await t + except (asyncio.CancelledError, Exception): + pass + + # ---------- enqueue ----------------------------------------------------- + + async def enqueue(self, job_id: str) -> None: + await self.queue.put(job_id) + + def stats(self) -> dict: + return { + "queued": self.queue.qsize(), + "running": self.in_flight, + "max": self.max_concurrent, + "peak": self.peak_in_flight, + } + + # ---------- dispatcher -------------------------------------------------- + + async def _dispatch_loop(self) -> None: + try: + while not self._stopping: + job_id = await self.queue.get() + # Acquire BEFORE spawning the task so we naturally block when + # the pool is full instead of building up an unbounded set of + # tasks that all immediately await the semaphore. + await self.semaphore.acquire() + if self._stopping: + self.semaphore.release() + break + t = asyncio.create_task(self._wrap_run(job_id)) + self._tasks.add(t) + t.add_done_callback(self._tasks.discard) + except asyncio.CancelledError: + raise + + async def _wrap_run(self, job_id: str) -> None: + self.in_flight += 1 + if self.in_flight > self.peak_in_flight: + self.peak_in_flight = self.in_flight + try: + await self._run_job(job_id) + except Exception as e: + log.exception("runner: unhandled error for job %s: %s", job_id, e) + finally: + self.in_flight -= 1 + self.semaphore.release() + + # ---------- gc loop ----------------------------------------------------- + + async def _gc_loop(self) -> None: + try: + while not self._stopping: + await asyncio.sleep(self.gc_interval_secs) + try: + res = await self.workspace.gc(age_secs=self.gc_age_secs) + if res["removed"]: + log.info("workspace gc: %s", res) + except Exception as e: + log.warning("workspace gc failed: %s", e) + except asyncio.CancelledError: + raise + + # ---------- core -------------------------------------------------------- + + async def _run_job(self, job_id: str) -> None: + ctx = await self._load_context(job_id) + if ctx is None: + return + + await self.db.arun(self.db.mark_job_running, job_id) + + log_path = Path(ctx.job["log_path"]) + log_path.parent.mkdir(parents=True, exist_ok=True) + + recipe_kind = ctx.job["recipe"] + cmd_str = ctx.subproject.get(recipe_kind) + timeout = int(ctx.subproject.get("timeout_secs") or self.default_timeout_secs) + + terminal_status = "succeeded" + exit_code: int | None = None + + with log_path.open("w", encoding="utf-8") as log_fh: + log_fh.write(f"[crafting-table] job {job_id}\n") + log_fh.write(f"[crafting-table] project={ctx.job['project_name']} subproject={ctx.job['subproject_path']}\n") + log_fh.write(f"[crafting-table] recipe={recipe_kind} branch={ctx.job['branch']}\n") + log_fh.write(f"[crafting-table] cmd={cmd_str!r} timeout={timeout}s\n") + log_fh.flush() + + if not cmd_str: + log_fh.write(f"[crafting-table] subproject has no '{recipe_kind}' command\n") + terminal_status = "failed" + exit_code = -2 + else: + try: + paths = await self.workspace.materialize( + project=ctx.job["project_name"], + job_id=job_id, + git_url=ctx.project["git_url"], + branch=ctx.job["branch"], + log_fh=log_fh, + ) + except Exception as e: + log_fh.write(f"[crafting-table] workspace error: {e}\n") + terminal_status = "failed" + exit_code = -3 + else: + sub_path = ctx.subproject.get("path", ".") + work_dir = paths.worktree_dir / sub_path + + log_fh.write(f"[crafting-table] cwd={work_dir}\n") + log_fh.write("[crafting-table] --- recipe output begin ---\n") + log_fh.flush() + try: + exit_code, timed_out = await self._exec_recipe( + cmd=cmd_str, cwd=str(work_dir), log_fh=log_fh, timeout=timeout + ) + if timed_out: + terminal_status = "timed_out" + elif exit_code == 0: + terminal_status = "succeeded" + else: + terminal_status = "failed" + except asyncio.CancelledError: + log_fh.write("[crafting-table] cancelled\n") + terminal_status = "cancelled" + exit_code = -4 + # Re-raise so the dispatcher's task tracking sees cancellation. + await self.db.arun( + self.db.mark_job_finished, + job_id=job_id, + status=terminal_status, + exit_code=exit_code, + ) + await self.workspace.cleanup(paths) + raise + log_fh.write(f"[crafting-table] --- recipe output end (exit={exit_code}) ---\n") + log_fh.flush() + + await self.workspace.cleanup(paths) + + await self.db.arun( + self.db.mark_job_finished, + job_id=job_id, + status=terminal_status, + exit_code=exit_code, + ) + + # Hook fan-out — wave 2 parsers + wave 8 digest hook into this. + finished_event = { + "job_id": job_id, + "project_name": ctx.job["project_name"], + "subproject_path": ctx.job["subproject_path"], + "recipe": recipe_kind, + "status": terminal_status, + "exit_code": exit_code, + "log_path": str(log_path), + "finished_at": int(time.time()), + } + for hook in self._hooks: + try: + await hook(finished_event) + except Exception as e: + log.warning("jobs_finished hook failed: %s", e) + + async def _exec_recipe( + self, *, cmd: str, cwd: str, log_fh, timeout: int + ) -> tuple[int, bool]: + """Run cmd via /bin/sh -c, stream output to log_fh, return (exit, timed_out). + + Uses create_subprocess_shell because recipe strings are operator-trusted + shell expressions (e.g. `cargo build && cargo test`). Stdout+stderr + merged into one stream to preserve interleaving order, which matters + for log readability. + + Important asyncio detail: we wrap proc.wait() in a single task and + gate timeout with asyncio.wait() rather than wait_for(). wait_for + cancels the underlying coroutine on timeout, which on Python 3.11 + marks the proc.wait() future as cancelled — so a SECOND wait_for on + the same proc would immediately raise CancelledError instead of + returning the post-terminate exit code. Wrapping once with a + long-lived task lets us await it twice cleanly. + """ + # start_new_session=True puts the shell in its own process group so + # we can signal the WHOLE group on timeout. Without this, terminate() + # only signals the shell; long-running children (sleep, cargo build, + # etc.) inherit init and keep stdout open, so the pump never EOFs. + proc = await asyncio.create_subprocess_shell( + cmd, + cwd=cwd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + start_new_session=True, + ) + assert proc.stdout is not None + pgid = proc.pid # equals the new session/group id since we just created it + + def _kill_group(sig: int) -> None: + try: + os.killpg(pgid, sig) + except ProcessLookupError: + pass + + async def pump() -> None: + while True: + line = await proc.stdout.readline() + if not line: + break + log_fh.write(line.decode("utf-8", "replace")) + log_fh.flush() + + pump_task = asyncio.create_task(pump()) + wait_task = asyncio.create_task(proc.wait()) + + timed_out = False + # Phase 1 — give the process up to `timeout` seconds to finish + # naturally. + done, _ = await asyncio.wait({wait_task}, timeout=timeout) + if not done: + timed_out = True + log_fh.write(f"\n[crafting-table] timeout after {timeout}s — terminating\n") + log_fh.flush() + _kill_group(signal.SIGTERM) + # Phase 2 — graceful shutdown grace period after SIGTERM + done, _ = await asyncio.wait({wait_task}, timeout=10) + if not done: + # Phase 3 — escalate to SIGKILL on the group + log_fh.write("[crafting-table] grace expired — SIGKILL\n") + log_fh.flush() + _kill_group(signal.SIGKILL) + await wait_task + # wait_task is now done — pull the rc out + rc = wait_task.result() + + # Drain pump. With process-group kill stdout will EOF cleanly; + # the wait_for guard is belt-and-braces against any orphan that + # somehow survived (e.g. a child that escaped its group). + try: + await asyncio.wait_for(pump_task, timeout=2) + except (asyncio.TimeoutError, Exception): + pump_task.cancel() + try: + await pump_task + except (asyncio.CancelledError, Exception): + pass + + return int(rc), timed_out + + # ---------- helpers ----------------------------------------------------- + + async def _load_context(self, job_id: str) -> _JobContext | None: + job = await self.db.arun(self.db.get_job, job_id) + if job is None: + log.warning("runner: job %s vanished before dispatch", job_id) + return None + recipe = json.loads(job["recipe_snapshot_json"]) + # subproject inside the snapshot + subprojects = recipe.get("subprojects", []) + match = None + for s in subprojects: + if s.get("path") == job["subproject_path"]: + match = s + break + if match is None: + # Fallback to the first one — should never happen since we + # validate at enqueue time. + match = subprojects[0] if subprojects else {} + project = await self.db.arun(self.db.get_project, job["project_name"]) + if project is None: + # Project was deleted while job sat in queue. The FK cascade in + # the schema would have nuked the job row too, but we may have + # popped the id off the queue before the cascade landed. + log.warning("runner: project for job %s gone", job_id) + return None + return _JobContext(job_id=job_id, job=job, project=project, recipe=recipe, subproject=match) diff --git a/crafting_table/server.py b/crafting_table/server.py new file mode 100644 index 0000000..602ce90 --- /dev/null +++ b/crafting_table/server.py @@ -0,0 +1,484 @@ +"""FastAPI app — port 8810. The HTTP surface for crafting-table. + +Authentication model: +- Every request needs `Authorization: Bearer `. +- The bearer is hashed and looked up in the tokens table. +- Tokens are flagged is_admin=1 or 0. Admin can do everything. +- Per-app tokens (is_admin=0) can register projects (becoming the owner) + and only see/touch projects where owner_token matches their name. +- Cross-token project access returns 404 (NOT 403) — same existence-leak + guard clawdforge uses for sessions. + +Endpoints: +- GET /healthz — public-ish (still needs LAN IP) +- POST /admin/tokens — admin only +- GET /admin/tokens — admin only +- DELETE /admin/tokens/{name} — admin only +- POST /projects — any token (becomes owner) +- GET /projects — caller's projects (or all if admin) +- GET /projects/{name} — visibility-gated, 404 on cross-token +- PUT /projects/{name} — owner or admin only +- DELETE /projects/{name} — owner or admin only; cascades jobs+findings +- POST /projects/{name}/jobs — visibility-gated; enqueues a job +- GET /jobs — caller's jobs (or all if admin) +- GET /jobs/{id} — owner or admin; returns last 200 log lines +- GET /jobs/{id}/log — owner or admin; full log file stream +- GET /jobs/{id}/findings — owner or admin; empty list in wave 1 +""" +from __future__ import annotations + +import asyncio +import json +import logging +import time +import uuid +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Annotated + +from fastapi import FastAPI, Header, HTTPException, Request +from fastapi.responses import FileResponse, JSONResponse + +from .auth import Auth, AppToken +from .config import Config, load +from .db import DB +from .models import ( + CreateJobRequest, + Project, + TokenCreateRequest, +) +from .runner import Runner +from .workspace import WorkspaceManager + + +log = logging.getLogger("crafting_table") +if not log.handlers: + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") + + +# ---------- module-level singletons (rebuilt per test via fixture) ---------- + +cfg: Config = load() +db: DB = DB(cfg.db_path) +auth: Auth = Auth(db=db, lan_cidrs=cfg.lan_cidrs) +workspace: WorkspaceManager = WorkspaceManager(cfg.workspace_root) +runner: Runner = Runner( + db=db, + workspace=workspace, + log_dir=cfg.log_dir, + max_concurrent=cfg.max_concurrent_jobs, + default_timeout_secs=cfg.default_job_timeout_secs, + gc_interval_secs=cfg.workspace_gc_interval_secs, + gc_age_secs=cfg.workspace_gc_age_secs, +) + + +# ---------- lifespan -------------------------------------------------------- + + +@asynccontextmanager +async def _lifespan(app: FastAPI): + auth.bootstrap_admin(cfg.admin_bearer_path) + await runner.start() + log.info( + "crafting-table startup: db=%s log_dir=%s max_concurrent=%d port=%d", + cfg.db_path, cfg.log_dir, cfg.max_concurrent_jobs, cfg.api_port, + ) + try: + yield + finally: + await runner.stop() + log.info("crafting-table shutdown complete") + + +app = FastAPI(title="crafting-table", version="0.1.0", lifespan=_lifespan) + + +# ---------- helpers --------------------------------------------------------- + + +def _project_visible(project_row: dict | None, token: AppToken) -> dict: + """Return the project row if visible to this token, else raise 404. + + Existence-leak guard: cross-token access yields the same 404 a missing + project would. + """ + if project_row is None: + raise HTTPException(404, "project not found") + if token.is_admin: + return project_row + if project_row["owner_token"] == token.name: + return project_row + raise HTTPException(404, "project not found") + + +def _job_visible(job_row: dict | None, token: AppToken) -> dict: + if job_row is None: + raise HTTPException(404, "job not found") + if token.is_admin: + return job_row + project_row = db.get_project(job_row["project_name"]) + if project_row is None or project_row["owner_token"] != token.name: + raise HTTPException(404, "job not found") + return job_row + + +def _project_to_api(row: dict) -> dict: + """Inflate a DB row + recipe_json into the API-shaped Project dict.""" + recipe = json.loads(row["recipe_json"]) + recipe["name"] = row["name"] + recipe["git_url"] = row["git_url"] + recipe["default_branch"] = row["default_branch"] + recipe["created_at"] = row["created_at"] + recipe["updated_at"] = row["updated_at"] + return recipe + + +def _project_recipe_blob(p: Project) -> str: + """Serialize the parts of Project we store inside recipe_json (omit the + fields that get their own columns: name, git_url, default_branch, + created_at, updated_at).""" + return json.dumps({ + "languages": p.languages, + "subprojects": [s.model_dump() for s in p.subprojects], + "schedule": p.schedule.model_dump(), + "notify": p.notify.model_dump(), + }) + + +# ---------- endpoints ------------------------------------------------------- + + +@app.get("/healthz") +async def healthz(request: Request): + auth.require_global_ip(request) + # Cheap liveness — DB query that exercises the connection. + try: + await db.arun(db.applied_migrations) + db_ok = True + except Exception as e: + db_ok = False + log.warning("healthz db check failed: %s", e) + return { + "ok": True, + "db": "ok" if db_ok else "fail", + "runner": runner.stats(), + "version": "0.1.0", + } + + +# ---- /admin/tokens --------------------------------------------------------- + + +@app.post("/admin/tokens") +async def admin_create_token( + request: Request, + body: TokenCreateRequest, + authorization: Annotated[str | None, Header()] = None, +): + auth.require_admin(request, authorization) + import secrets as _s + bearer = ("ct_" if not body.is_admin else "ctadmin_") + _s.token_urlsafe(32) + try: + await db.arun( + db.insert_token, + name=body.name, + bearer=bearer, + is_admin=body.is_admin, + ip_cidrs=body.ip_cidrs or None, + ) + except Exception as e: + # UNIQUE-violation, etc. Don't leak DB internals. + raise HTTPException(409, f"token create failed: {type(e).__name__}") + return { + "ok": True, + "name": body.name, + "bearer": bearer, + "is_admin": body.is_admin, + "ip_cidrs": body.ip_cidrs, + } + + +@app.get("/admin/tokens") +async def admin_list_tokens( + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + auth.require_admin(request, authorization) + rows = await db.arun(db.list_tokens) + return {"ok": True, "tokens": rows} + + +@app.delete("/admin/tokens/{name}") +async def admin_revoke_token( + name: str, + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + auth.require_admin(request, authorization) + if name == "admin": + raise HTTPException(400, "cannot revoke the admin token via API") + revoked = await db.arun(db.revoke_token, name) + if not revoked: + raise HTTPException(404, "token not found or already revoked") + return {"ok": True} + + +# ---- /projects ------------------------------------------------------------- + + +@app.post("/projects") +async def register_project( + request: Request, + body: Project, + authorization: Annotated[str | None, Header()] = None, +): + tok = auth.require_app(request, authorization) + existing = await db.arun(db.get_project, body.name) + if existing is not None: + # Cross-token registration of the same name is treated as a 409 even + # for admin — admin who wants to take over should DELETE then re-POST, + # or PUT. + if not tok.is_admin and existing["owner_token"] != tok.name: + # 404, not 409 — don't leak that the name is taken under a + # different token. + raise HTTPException(404, "project not found") + raise HTTPException(409, "project already exists; use PUT to update") + row = await db.arun( + db.upsert_project, + name=body.name, + git_url=body.git_url, + default_branch=body.default_branch, + recipe_json=_project_recipe_blob(body), + owner_token=tok.name, + ) + return {"ok": True, "project": _project_to_api(row)} + + +@app.put("/projects/{name}") +async def update_project( + name: str, + request: Request, + body: Project, + authorization: Annotated[str | None, Header()] = None, +): + tok = auth.require_app(request, authorization) + existing = await db.arun(db.get_project, name) + _project_visible(existing, tok) + if body.name != name: + raise HTTPException(400, "name in body must match path") + row = await db.arun( + db.upsert_project, + name=name, + git_url=body.git_url, + default_branch=body.default_branch, + recipe_json=_project_recipe_blob(body), + owner_token=existing["owner_token"], + ) + return {"ok": True, "project": _project_to_api(row)} + + +@app.delete("/projects/{name}") +async def delete_project( + name: str, + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + tok = auth.require_app(request, authorization) + existing = await db.arun(db.get_project, name) + _project_visible(existing, tok) + deleted = await db.arun(db.delete_project, name) + if not deleted: + raise HTTPException(404, "project not found") + return {"ok": True} + + +@app.get("/projects") +async def list_projects( + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + tok = auth.require_app(request, authorization) + owner = None if tok.is_admin else tok.name + rows = await db.arun(db.list_projects, owner_token=owner) + return {"ok": True, "projects": [_project_to_api(r) for r in rows]} + + +@app.get("/projects/{name}") +async def get_project( + name: str, + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + tok = auth.require_app(request, authorization) + row = await db.arun(db.get_project, name) + _project_visible(row, tok) + return {"ok": True, "project": _project_to_api(row)} + + +# ---- /projects/{name}/jobs ------------------------------------------------- + + +@app.post("/projects/{name}/jobs") +async def create_job( + name: str, + request: Request, + body: CreateJobRequest, + authorization: Annotated[str | None, Header()] = None, +): + tok = auth.require_app(request, authorization) + project_row = await db.arun(db.get_project, name) + _project_visible(project_row, tok) + + recipe = json.loads(project_row["recipe_json"]) + subprojects = recipe.get("subprojects", []) + if not subprojects: + raise HTTPException(400, "project has no subprojects") + + # Pick the right subproject: + # - explicit body.subproject takes the matching path entry + # - otherwise pick the first subproject that has a non-empty command for + # the requested recipe kind + chosen = None + if body.subproject is not None: + for s in subprojects: + if s.get("path") == body.subproject: + chosen = s + break + if chosen is None: + raise HTTPException(400, f"subproject '{body.subproject}' not found in project") + else: + for s in subprojects: + if s.get(body.recipe): + chosen = s + break + if chosen is None: + raise HTTPException(400, f"no subproject defines a '{body.recipe}' command") + + if not chosen.get(body.recipe): + raise HTTPException(400, f"subproject '{chosen.get('path', '.')}' has no '{body.recipe}' command") + + job_id = str(uuid.uuid4()) + log_path = str(Path(cfg.log_dir) / f"{job_id}.log") + branch = body.branch or project_row["default_branch"] + + # Snapshot the recipe at run-time. Future recipe edits don't retcon this + # job's view of what command should run — every job carries its own + # frozen copy. + snapshot = { + "git_url": project_row["git_url"], + "default_branch": project_row["default_branch"], + "subprojects": subprojects, + "languages": recipe.get("languages", []), + } + + row = await db.arun( + db.insert_job, + job_id=job_id, + project_name=name, + subproject_path=chosen.get("path", "."), + recipe=body.recipe, + branch=branch, + log_path=log_path, + recipe_snapshot_json=json.dumps(snapshot), + ) + await runner.enqueue(job_id) + return {"ok": True, "job_id": job_id, "status": "queued", "job": row} + + +# ---- /jobs ----------------------------------------------------------------- + + +@app.get("/jobs") +async def list_jobs( + request: Request, + authorization: Annotated[str | None, Header()] = None, + project: str | None = None, + status: str | None = None, + limit: int = 50, +): + tok = auth.require_app(request, authorization) + owner = None if tok.is_admin else tok.name + rows = await db.arun( + db.list_jobs, + project_name=project, + status=status, + owner_token=owner, + limit=max(1, min(limit, 500)), + ) + return {"ok": True, "jobs": rows} + + +@app.get("/jobs/{id}") +async def get_job( + id: str, + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + tok = auth.require_app(request, authorization) + row = await db.arun(db.get_job, id) + _job_visible(row, tok) + + log_tail: list[str] = [] + log_path = Path(row["log_path"]) + if log_path.exists(): + try: + # Tail at most 200 lines without reading whole file into memory. + log_tail = _tail_lines(log_path, 200) + except Exception as e: + log.warning("log tail failed for %s: %s", row["log_path"], e) + + return {"ok": True, "job": row, "log_tail": log_tail} + + +@app.get("/jobs/{id}/log") +async def get_job_log( + id: str, + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + tok = auth.require_app(request, authorization) + row = await db.arun(db.get_job, id) + _job_visible(row, tok) + log_path = Path(row["log_path"]) + if not log_path.exists(): + raise HTTPException(404, "log file not present") + return FileResponse(str(log_path), media_type="text/plain", filename=f"{id}.log") + + +@app.get("/jobs/{id}/findings") +async def get_job_findings( + id: str, + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + tok = auth.require_app(request, authorization) + row = await db.arun(db.get_job, id) + _job_visible(row, tok) + findings = await db.arun(db.list_findings, id) + return {"ok": True, "findings": findings} + + +# ---------- helpers --------------------------------------------------------- + + +def _tail_lines(path: Path, n: int) -> list[str]: + """Read the last n lines of a file without slurping the whole thing. + + Implementation: seek backwards in chunks, splitting on \\n. Good enough + for log files in the MB range; if a single line is huge (rare) we'll + read more than the strict minimum. + """ + BLOCK = 4096 + with path.open("rb") as fh: + fh.seek(0, 2) + size = fh.tell() + data = b"" + while size > 0 and data.count(b"\n") <= n: + read = min(BLOCK, size) + size -= read + fh.seek(size) + data = fh.read(read) + data + text = data.decode("utf-8", "replace") + lines = text.splitlines() + return lines[-n:] diff --git a/crafting_table/workspace.py b/crafting_table/workspace.py new file mode 100644 index 0000000..8912143 --- /dev/null +++ b/crafting_table/workspace.py @@ -0,0 +1,195 @@ +"""Workspace materialization — git clone + worktree + gc. + +Layout (per project): + /workspace//.cache/ bare clone of the upstream + /workspace/// worktree for the requested branch+sha + +Strategy: +- First time we see a project: bare clone --bare to .cache/. +- Subsequent jobs: `git fetch` the cache, then `git worktree add` the + requested branch into the per-job dir. +- After the job finishes: `git worktree remove` the per-job dir. Bare clone + stays put for the next run. +- Periodic gc: any worktree dir older than CRAFTING_GC_AGE seconds gets + pruned (defends against orphans from runner crashes). + +Why bare + worktree (not fresh full clones): cargo/maven/gradle caches live +in /caches, but the source tree itself is fast to materialize this way and +leaves zero cross-job contamination. Fresh git clone of a 100MB repo takes +seconds; worktree-add is milliseconds. + +Recipe commands run in the worktree dir (subproject path resolved against +the worktree root). +""" +from __future__ import annotations + +import asyncio +import logging +import shutil +import time +from dataclasses import dataclass +from pathlib import Path + + +log = logging.getLogger("crafting_table.workspace") + + +@dataclass +class WorkspacePaths: + project_root: Path + cache_dir: Path # .cache/ — bare clone + worktree_dir: Path # per-job worktree + + +class WorkspaceManager: + def __init__(self, root: Path): + self.root = Path(root) + self.root.mkdir(parents=True, exist_ok=True) + + def paths_for(self, *, project: str, job_id: str) -> WorkspacePaths: + project_root = self.root / project + return WorkspacePaths( + project_root=project_root, + cache_dir=project_root / ".cache", + worktree_dir=project_root / job_id, + ) + + async def materialize( + self, + *, + project: str, + job_id: str, + git_url: str, + branch: str, + log_fh, + ) -> WorkspacePaths: + """Ensure the per-job worktree exists and is checked out at branch. + + Writes git progress lines into log_fh. Raises CalledProcessError-like + exceptions through if a git step fails — runner.py catches and marks + the job failed. + """ + paths = self.paths_for(project=project, job_id=job_id) + paths.project_root.mkdir(parents=True, exist_ok=True) + + if not paths.cache_dir.exists(): + log_fh.write(f"[workspace] bare clone {git_url} -> {paths.cache_dir}\n") + log_fh.flush() + await _git(["clone", "--bare", git_url, str(paths.cache_dir)], log_fh, cwd=str(paths.project_root)) + else: + log_fh.write(f"[workspace] fetching latest into {paths.cache_dir}\n") + log_fh.flush() + # --prune drops branches deleted upstream so worktree-add doesn't + # silently land on a stale ref. + await _git(["fetch", "--prune", "origin", "+refs/heads/*:refs/heads/*"], log_fh, cwd=str(paths.cache_dir)) + + if paths.worktree_dir.exists(): + # A previous run for the same job_id (replay or restart). Wipe it. + log_fh.write(f"[workspace] removing existing worktree {paths.worktree_dir}\n") + log_fh.flush() + await self._cleanup_worktree(paths) + + log_fh.write(f"[workspace] worktree add {paths.worktree_dir} branch={branch}\n") + log_fh.flush() + await _git( + ["worktree", "add", "--force", str(paths.worktree_dir), branch], + log_fh, + cwd=str(paths.cache_dir), + ) + return paths + + async def cleanup(self, paths: WorkspacePaths) -> None: + """Remove a worktree post-job. Best-effort — failures logged, not raised.""" + try: + await self._cleanup_worktree(paths) + except Exception as e: + log.warning("worktree cleanup failed for %s: %s", paths.worktree_dir, e) + + async def _cleanup_worktree(self, paths: WorkspacePaths) -> None: + if paths.worktree_dir.exists() and paths.cache_dir.exists(): + try: + proc = await asyncio.create_subprocess_exec( + "git", "worktree", "remove", "--force", str(paths.worktree_dir), + cwd=str(paths.cache_dir), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + await proc.wait() + except Exception: + pass + # Fallback: rmtree if the worktree dir is still around. + if paths.worktree_dir.exists(): + shutil.rmtree(paths.worktree_dir, ignore_errors=True) + + async def gc(self, *, age_secs: int) -> dict: + """Sweep worktrees older than age_secs. Returns counters.""" + cutoff = time.time() - age_secs + removed = 0 + scanned = 0 + for project_dir in self.root.iterdir(): + if not project_dir.is_dir(): + continue + cache_dir = project_dir / ".cache" + for child in project_dir.iterdir(): + scanned += 1 + if child.name == ".cache": + continue + if not child.is_dir(): + continue + try: + mtime = child.stat().st_mtime + except OSError: + continue + if mtime > cutoff: + continue + # Old worktree — prune. + if cache_dir.exists(): + try: + proc = await asyncio.create_subprocess_exec( + "git", "worktree", "remove", "--force", str(child), + cwd=str(cache_dir), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + await proc.wait() + except Exception: + pass + shutil.rmtree(child, ignore_errors=True) + removed += 1 + + # Periodic `git gc` on the bare clone if it's been quiet for >7d + if cache_dir.exists(): + try: + cache_mtime = cache_dir.stat().st_mtime + if time.time() - cache_mtime > 7 * 86400: + proc = await asyncio.create_subprocess_exec( + "git", "gc", "--prune=now", "--quiet", + cwd=str(cache_dir), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + await proc.wait() + except Exception: + pass + + return {"scanned": scanned, "removed": removed} + + +async def _git(args: list[str], log_fh, *, cwd: str | None = None) -> None: + """Run `git ` and stream stdout+stderr to log_fh.""" + proc = await asyncio.create_subprocess_exec( + "git", *args, + cwd=cwd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + assert proc.stdout is not None + while True: + line = await proc.stdout.readline() + if not line: + break + log_fh.write(line.decode("utf-8", "replace")) + log_fh.flush() + rc = await proc.wait() + if rc != 0: + raise RuntimeError(f"git {args[0]} exited {rc}") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..457d1e2 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,40 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "crafting-table" +version = "0.1.0" +description = "Polyglot dev/build/audit container — HTTP API + async job runner." +readme = "README.md" +license = { text = "MIT" } +requires-python = ">=3.11" +authors = [ + { name = "Kayos", email = "kayos@sulkta.com" }, +] +dependencies = [ + "fastapi>=0.115,<1.0", + "uvicorn[standard]>=0.30,<1.0", + "pydantic>=2.7,<3.0", +] + +[project.optional-dependencies] +test = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "httpx>=0.27", +] + +[tool.setuptools.packages.find] +include = ["crafting_table*"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +filterwarnings = [ + # asyncio's BaseSubprocessTransport.__del__ can fire after the test loop + # closes when subprocess cleanup races with pytest-asyncio's loop teardown. + # The test logic verifies the subprocess actually died; the late __del__ + # touch is harmless. Suppressing keeps pytest's AST cache from blowing up. + "ignore::pytest.PytestUnraisableExceptionWarning", +] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f71f250 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +fastapi==0.115.5 +uvicorn[standard]==0.32.1 +pydantic==2.9.2 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..38847cc --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,159 @@ +"""Shared pytest fixtures for crafting-table tests. + +Every test gets: +- A fresh tmp dir for db / log_dir / workspace_root / admin_bearer file +- Module reload so server.py's module-level singletons pick up our env +- IP allowlist patched to accept 127.0.0.1 (TestClient reports 'testclient') +- An admin bearer + a pre-minted app token + a second app token for isolation tests + +Tests can then use `client` (TestClient + ctx dict) for HTTP-level tests, or +`db_only` if they only need a DB instance (no server boot). +""" +from __future__ import annotations + +import os +import sys +import time +from pathlib import Path + +import pytest + + +# ---------- per-test workspace -------------------------------------------- + + +@pytest.fixture +def tmp_workspace(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + db_path = tmp_path / "crafting.db" + log_dir = tmp_path / "jobs" + workspace_root = tmp_path / "workspace" + admin_bearer = tmp_path / "admin-bearer.txt" + log_dir.mkdir() + workspace_root.mkdir() + + monkeypatch.setenv("CRAFTING_DB", str(db_path)) + monkeypatch.setenv("CRAFTING_LOG_DIR", str(log_dir)) + monkeypatch.setenv("CRAFTING_WORKSPACE", str(workspace_root)) + monkeypatch.setenv("CRAFTING_ADMIN_BEARER", str(admin_bearer)) + monkeypatch.setenv("CRAFTING_MAX_CONCURRENT", "2") + monkeypatch.setenv("CRAFTING_DEFAULT_JOB_TIMEOUT", "10") + monkeypatch.setenv("CRAFTING_LAN_CIDRS", "127.0.0.0/8,::1/128,10.0.0.0/8") + monkeypatch.setenv("CRAFTING_GC_INTERVAL", "9999") + monkeypatch.setenv("CRAFTING_GC_AGE", "86400") + + yield { + "db_path": db_path, + "log_dir": log_dir, + "workspace_root": workspace_root, + "admin_bearer": admin_bearer, + } + + +@pytest.fixture +def db_only(tmp_workspace): + """For tests that don't need the server — just a DB instance.""" + # Import locally so env vars are in effect before module import. + from crafting_table.db import DB + return DB(str(tmp_workspace["db_path"])) + + +# ---------- server reload + TestClient ------------------------------------ + + +def _reload_server_modules(): + """Drop cached modules so module-level singletons re-bind with current env.""" + for name in [ + "crafting_table.server", + "crafting_table.runner", + "crafting_table.workspace", + "crafting_table.auth", + "crafting_table.db", + "crafting_table.config", + "crafting_table.models", + "crafting_table", + ]: + sys.modules.pop(name, None) + + +@pytest.fixture +def server(tmp_workspace, monkeypatch: pytest.MonkeyPatch): + """Reload server module + return the module so tests can poke internals.""" + _reload_server_modules() + + from crafting_table import auth as auth_mod # noqa: WPS433 + monkeypatch.setattr(auth_mod, "_client_ip", lambda _req: "127.0.0.1") + + from crafting_table import server as srv # noqa: WPS433 + return srv + + +@pytest.fixture +def client(server, tmp_workspace): + """FastAPI TestClient + ctx dict (admin_bearer, app_token, other_token, ...). + + The TestClient context-manages lifespan, so on entry the runner starts + + admin token is bootstrapped. On exit the runner stops cleanly. + """ + from fastapi.testclient import TestClient + + with TestClient(server.app) as tc: + admin_bearer = tmp_workspace["admin_bearer"].read_text().strip() + + # Mint two app tokens for isolation tests + r = tc.post( + "/admin/tokens", + headers={"Authorization": f"Bearer {admin_bearer}"}, + json={"name": "alpha", "is_admin": False, "ip_cidrs": []}, + ) + assert r.status_code == 200, r.text + alpha_bearer = r.json()["bearer"] + + r2 = tc.post( + "/admin/tokens", + headers={"Authorization": f"Bearer {admin_bearer}"}, + json={"name": "bravo", "is_admin": False, "ip_cidrs": []}, + ) + assert r2.status_code == 200, r2.text + bravo_bearer = r2.json()["bearer"] + + yield tc, { + "admin_bearer": admin_bearer, + "alpha_bearer": alpha_bearer, + "alpha_name": "alpha", + "bravo_bearer": bravo_bearer, + "bravo_name": "bravo", + "server": server, + } + + +# ---------- shared sample data --------------------------------------------- + + +def sample_project_payload(*, name: str = "demo", recipe_cmds: dict | None = None) -> dict: + cmds = recipe_cmds or { + "build": "echo build && true", + "test": "echo test && true", + "lint": "echo lint && true", + "audit": "echo audit && true", + } + return { + "name": name, + "git_url": "/dev/null", # tests don't actually clone unless they want to + "default_branch": "main", + "languages": ["python"], + "subprojects": [ + { + "path": ".", + "language": "python", + "build": cmds.get("build"), + "test": cmds.get("test"), + "lint": cmds.get("lint"), + "audit": cmds.get("audit"), + "timeout_secs": 5, + } + ], + "schedule": {"audit": "manual", "test": None, "build": None, "lint": None}, + "notify": {"email": [], "on": [], "auto_patch": False}, + "created_at": 0, + "updated_at": 0, + } diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..f7881c8 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,122 @@ +"""Auth — bearer hashing, IP allowlist, admin vs app, revoked token reject.""" +from __future__ import annotations + +import pytest +from fastapi import HTTPException + +from crafting_table.auth import Auth, _ip_in_any, _const_eq + + +class _FakeRequest: + def __init__(self, ip: str): + class C: + host = ip + self.client = C() + + +def test_bearer_hash_lookup_match(db_only): + db_only.insert_token(name="x", bearer="ct_secret_x", is_admin=False, ip_cidrs=None) + rec = db_only.lookup_token_by_bearer("ct_secret_x") + assert rec is not None + assert rec["name"] == "x" + + +def test_bearer_hash_lookup_miss(db_only): + assert db_only.lookup_token_by_bearer("definitely-not-real") is None + + +def test_ip_in_any_loopback_always_allowed(): + assert _ip_in_any("127.0.0.1", []) is True + assert _ip_in_any("::1", []) is True + + +def test_ip_in_any_match_cidr(): + assert _ip_in_any("192.168.0.5", ["192.168.0.0/16"]) is True + assert _ip_in_any("10.1.2.3", ["10.0.0.0/8"]) is True + + +def test_ip_in_any_miss(): + assert _ip_in_any("8.8.8.8", ["192.168.0.0/16", "10.0.0.0/8"]) is False + + +def test_ip_in_any_invalid_input(): + assert _ip_in_any("not-an-ip", ["10.0.0.0/8"]) is False + + +def test_const_eq_basic(): + assert _const_eq("abc", "abc") is True + assert _const_eq("abc", "abd") is False + assert _const_eq("abc", "abcd") is False + + +def test_require_app_missing_bearer(db_only): + a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"]) + with pytest.raises(HTTPException) as ei: + a.require_app(_FakeRequest("127.0.0.1"), None) + assert ei.value.status_code == 401 + + +def test_require_app_bad_bearer(db_only): + a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"]) + with pytest.raises(HTTPException) as ei: + a.require_app(_FakeRequest("127.0.0.1"), "Bearer not-a-real-token") + assert ei.value.status_code == 403 + + +def test_require_app_revoked_rejects(db_only): + a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"]) + db_only.insert_token(name="r", bearer="ct_r", is_admin=False, ip_cidrs=None) + db_only.revoke_token("r") + with pytest.raises(HTTPException) as ei: + a.require_app(_FakeRequest("127.0.0.1"), "Bearer ct_r") + assert ei.value.status_code == 403 + + +def test_require_app_per_token_ip_allowlist(db_only): + a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8", "10.0.0.0/8"]) + db_only.insert_token(name="ip", bearer="ct_ip", is_admin=False, ip_cidrs=["10.0.0.0/8"]) + # 10.x is in the allowlist + tok = a.require_app(_FakeRequest("10.0.0.5"), "Bearer ct_ip") + assert tok.name == "ip" + # Loopback is allowed because _ip_in_any short-circuits on is_loopback + tok2 = a.require_app(_FakeRequest("127.0.0.1"), "Bearer ct_ip") + assert tok2.name == "ip" + + +def test_require_admin_rejects_non_admin(db_only): + a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"]) + db_only.insert_token(name="u", bearer="ct_u", is_admin=False, ip_cidrs=None) + with pytest.raises(HTTPException) as ei: + a.require_admin(_FakeRequest("127.0.0.1"), "Bearer ct_u") + assert ei.value.status_code == 403 + + +def test_require_admin_accepts_admin(db_only): + a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"]) + db_only.insert_token(name="adm", bearer="ct_adm", is_admin=True, ip_cidrs=None) + tok = a.require_admin(_FakeRequest("127.0.0.1"), "Bearer ct_adm") + assert tok.is_admin is True + + +def test_global_ip_blocks_off_lan(db_only): + a = Auth(db=db_only, lan_cidrs=["192.168.0.0/16"]) + with pytest.raises(HTTPException) as ei: + a.require_global_ip(_FakeRequest("8.8.8.8")) + assert ei.value.status_code == 403 + + +def test_bootstrap_writes_admin_bearer(tmp_workspace): + """First boot mints + writes the bearer file. Second boot reads existing.""" + from crafting_table.db import DB + from crafting_table.auth import Auth + + db = DB(str(tmp_workspace["db_path"])) + a = Auth(db=db, lan_cidrs=["127.0.0.0/8"]) + bearer1 = a.bootstrap_admin(tmp_workspace["admin_bearer"]) + assert tmp_workspace["admin_bearer"].exists() + assert tmp_workspace["admin_bearer"].stat().st_mode & 0o777 == 0o600 + assert bearer1.startswith("ct_") + + # Second call returns the same bearer (read off disk) + bearer2 = a.bootstrap_admin(tmp_workspace["admin_bearer"]) + assert bearer2 == bearer1 diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..c310327 --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,214 @@ +"""DB-level tests — schema, migrations, round-trips.""" +from __future__ import annotations + +import json +import time + +import pytest + + +def test_migrations_applied(db_only): + versions = db_only.applied_migrations() + # All 5 migrations from the MIGRATIONS list should land on first boot. + assert "001_schema_migrations" in versions + assert "002_tokens" in versions + assert "003_projects" in versions + assert "004_jobs" in versions + assert "005_findings" in versions + + +def test_migrations_idempotent(db_only): + """Re-running migrate() is a no-op once applied — the INSERT OR IGNORE + on schema_migrations is the test we care about here.""" + before = set(db_only.applied_migrations()) + second_pass = db_only.migrate() + assert second_pass == [] # nothing applied + after = set(db_only.applied_migrations()) + assert before == after + + +def test_schema_has_required_tables(db_only): + with db_only._conn() as c: + rows = c.execute( + "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" + ).fetchall() + names = {r["name"] for r in rows} + for required in ("schema_migrations", "tokens", "projects", "jobs", "findings"): + assert required in names, f"missing table: {required}" + + +def test_token_round_trip(db_only): + db_only.insert_token( + name="alpha", + bearer="ct_test_alpha", + is_admin=False, + ip_cidrs=["10.0.0.0/8"], + ) + rec = db_only.lookup_token_by_bearer("ct_test_alpha") + assert rec is not None + assert rec["name"] == "alpha" + assert rec["is_admin"] is False + assert rec["ip_cidrs"] == ["10.0.0.0/8"] + + +def test_token_revoke(db_only): + db_only.insert_token(name="bravo", bearer="ct_b", is_admin=False, ip_cidrs=None) + assert db_only.lookup_token_by_bearer("ct_b") is not None + assert db_only.revoke_token("bravo") is True + assert db_only.lookup_token_by_bearer("ct_b") is None + + +def test_project_upsert_and_get(db_only): + db_only.insert_token(name="owner1", bearer="t1", is_admin=False, ip_cidrs=None) + row = db_only.upsert_project( + name="proj-a", + git_url="https://x.example/repo.git", + default_branch="main", + recipe_json='{"languages":["python"],"subprojects":[]}', + owner_token="owner1", + ) + assert row["name"] == "proj-a" + fetched = db_only.get_project("proj-a") + assert fetched is not None + assert fetched["git_url"] == "https://x.example/repo.git" + assert fetched["owner_token"] == "owner1" + + +def test_project_update_keeps_created_at(db_only): + db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) + a = db_only.upsert_project( + name="p", git_url="g1", default_branch="main", + recipe_json="{}", owner_token="o", + ) + time.sleep(1.1) # so updated_at moves + b = db_only.upsert_project( + name="p", git_url="g2", default_branch="trunk", + recipe_json="{}", owner_token="o", + ) + assert b["created_at"] == a["created_at"] + assert b["updated_at"] >= a["updated_at"] + assert b["git_url"] == "g2" + assert b["default_branch"] == "trunk" + + +def test_project_list_filters_by_owner(db_only): + db_only.insert_token(name="o1", bearer="ta", is_admin=False, ip_cidrs=None) + db_only.insert_token(name="o2", bearer="tb", is_admin=False, ip_cidrs=None) + db_only.upsert_project(name="p1", git_url="g", default_branch="main", + recipe_json="{}", owner_token="o1") + db_only.upsert_project(name="p2", git_url="g", default_branch="main", + recipe_json="{}", owner_token="o2") + only_o1 = db_only.list_projects(owner_token="o1") + assert {p["name"] for p in only_o1} == {"p1"} + everyone = db_only.list_projects() + assert {p["name"] for p in everyone} == {"p1", "p2"} + + +def test_project_delete_cascades_jobs_and_findings(db_only): + db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) + db_only.upsert_project(name="p", git_url="g", default_branch="main", + recipe_json="{}", owner_token="o") + db_only.insert_job( + job_id="j1", project_name="p", subproject_path=".", + recipe="test", branch="main", + log_path="/tmp/x.log", recipe_snapshot_json="{}", + ) + db_only.insert_finding( + job_id="j1", kind="lint", severity="warn", + message="m", fingerprint="fp1", + ) + assert db_only.get_job("j1") is not None + assert len(db_only.list_findings("j1")) == 1 + + db_only.delete_project("p") + # FK ON DELETE CASCADE should kill the job and via second cascade the finding + assert db_only.get_job("j1") is None + assert db_only.list_findings("j1") == [] + + +def test_job_round_trip(db_only): + db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) + db_only.upsert_project(name="p", git_url="g", default_branch="main", + recipe_json="{}", owner_token="o") + row = db_only.insert_job( + job_id="j2", project_name="p", subproject_path="clients/rust", + recipe="audit", branch="main", + log_path="/tmp/y.log", + recipe_snapshot_json=json.dumps({"subprojects": [{"path": "clients/rust"}]}), + ) + assert row["status"] == "queued" + + db_only.mark_job_running("j2") + j = db_only.get_job("j2") + assert j["status"] == "running" + assert j["started_at"] is not None + + db_only.mark_job_finished(job_id="j2", status="succeeded", exit_code=0) + j2 = db_only.get_job("j2") + assert j2["status"] == "succeeded" + assert j2["exit_code"] == 0 + assert j2["finished_at"] is not None + + +def test_orphaned_running_jobs_marked_failed_on_boot(db_only, tmp_path): + db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) + db_only.upsert_project(name="p", git_url="g", default_branch="main", + recipe_json="{}", owner_token="o") + db_only.insert_job( + job_id="orphan", project_name="p", subproject_path=".", + recipe="audit", branch="main", + log_path=str(tmp_path / "orphan.log"), + recipe_snapshot_json="{}", + ) + db_only.mark_job_running("orphan") + + ids = db_only.mark_orphaned_jobs_failed(log_dir=tmp_path) + assert "orphan" in ids + j = db_only.get_job("orphan") + assert j["status"] == "failed" + assert j["exit_code"] == -1 + # The synthetic log line should have landed in the orphan's log file + log_text = (tmp_path / "orphan.log").read_text() + assert "runner restart" in log_text + + +def test_finding_round_trip(db_only): + db_only.insert_token(name="o", bearer="t", is_admin=False, ip_cidrs=None) + db_only.upsert_project(name="p", git_url="g", default_branch="main", + recipe_json="{}", owner_token="o") + db_only.insert_job( + job_id="jx", project_name="p", subproject_path=".", + recipe="lint", branch="main", + log_path="/tmp/z.log", recipe_snapshot_json="{}", + ) + db_only.insert_finding( + job_id="jx", kind="lint", severity="warn", + file="src/foo.py", line=12, code="E501", + message="line too long", fingerprint="fp-1", + ) + findings = db_only.list_findings("jx") + assert len(findings) == 1 + assert findings[0]["file"] == "src/foo.py" + assert findings[0]["line"] == 12 + assert findings[0]["fingerprint"] == "fp-1" + + +def test_jobs_filter_by_owner(db_only): + db_only.insert_token(name="oa", bearer="ta", is_admin=False, ip_cidrs=None) + db_only.insert_token(name="ob", bearer="tb", is_admin=False, ip_cidrs=None) + db_only.upsert_project(name="pa", git_url="g", default_branch="main", + recipe_json="{}", owner_token="oa") + db_only.upsert_project(name="pb", git_url="g", default_branch="main", + recipe_json="{}", owner_token="ob") + db_only.insert_job(job_id="ja", project_name="pa", subproject_path=".", + recipe="test", branch="main", + log_path="/tmp/a.log", recipe_snapshot_json="{}") + db_only.insert_job(job_id="jb", project_name="pb", subproject_path=".", + recipe="test", branch="main", + log_path="/tmp/b.log", recipe_snapshot_json="{}") + + only_oa = db_only.list_jobs(owner_token="oa") + assert {j["id"] for j in only_oa} == {"ja"} + + only_running = db_only.list_jobs(status="running") + assert only_running == [] diff --git a/tests/test_e2e_register_run.py b/tests/test_e2e_register_run.py new file mode 100644 index 0000000..e80c53f --- /dev/null +++ b/tests/test_e2e_register_run.py @@ -0,0 +1,122 @@ +"""E2E — register a project, kick off a job, poll until terminal, assert. + +Uses a local file-protocol git URL (we init a tiny repo in tmp_path) so the +real workspace materialization path runs end-to-end without network. Recipe +is a no-op echo so we exit 0. +""" +from __future__ import annotations + +import asyncio +import shutil +import subprocess +import time +from pathlib import Path + +import pytest + +from tests.conftest import sample_project_payload + + +def _make_local_git_repo(root: Path) -> str: + """Init a fresh repo with one commit and return the on-disk path that can + be passed as a git_url (git clone supports plain paths).""" + if shutil.which("git") is None: + pytest.skip("git binary not present in test environment") + repo = root / "fixture-repo" + repo.mkdir() + subprocess.run(["git", "init", "-q", "-b", "main"], cwd=repo, check=True) + subprocess.run(["git", "config", "user.email", "test@example"], cwd=repo, check=True) + subprocess.run(["git", "config", "user.name", "test"], cwd=repo, check=True) + subprocess.run(["git", "config", "commit.gpgsign", "false"], cwd=repo, check=True) + (repo / "README.md").write_text("hello\n") + subprocess.run(["git", "add", "README.md"], cwd=repo, check=True) + subprocess.run(["git", "commit", "-q", "-m", "initial"], cwd=repo, check=True) + return str(repo) + + +def test_register_run_poll_succeeds(client, tmp_workspace, tmp_path): + tc, ctx = client + + # Make a real local repo + git_url = _make_local_git_repo(tmp_path) + + payload = sample_project_payload(name="e2e-proj") + payload["git_url"] = git_url + payload["subprojects"][0]["test"] = "echo running-test && exit 0" + payload["subprojects"][0]["audit"] = "echo running-audit && exit 0" + payload["subprojects"][0]["timeout_secs"] = 30 + + r = tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=payload, + ) + assert r.status_code == 200, r.text + + r2 = tc.post( + "/projects/e2e-proj/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "test"}, + ) + assert r2.status_code == 200, r2.text + job_id = r2.json()["job_id"] + + # Poll for terminal status + deadline = time.monotonic() + 30 + final = None + while time.monotonic() < deadline: + r3 = tc.get( + f"/jobs/{job_id}", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r3.status_code == 200 + body = r3.json() + status = body["job"]["status"] + if status in ("succeeded", "failed", "timed_out", "cancelled"): + final = body + break + time.sleep(0.2) + + assert final is not None, "job never reached terminal state" + assert final["job"]["status"] == "succeeded", final + assert final["job"]["exit_code"] == 0 + # Log tail should contain our echoed line + assert any("running-test" in line for line in final["log_tail"]), final["log_tail"] + + +def test_register_run_failing_recipe(client, tmp_workspace, tmp_path): + tc, ctx = client + git_url = _make_local_git_repo(tmp_path) + + payload = sample_project_payload(name="e2e-fail") + payload["git_url"] = git_url + payload["subprojects"][0]["test"] = "exit 7" + payload["subprojects"][0]["timeout_secs"] = 20 + + r = tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=payload, + ) + assert r.status_code == 200 + + r2 = tc.post( + "/projects/e2e-fail/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "test"}, + ) + job_id = r2.json()["job_id"] + + deadline = time.monotonic() + 30 + while time.monotonic() < deadline: + r3 = tc.get( + f"/jobs/{job_id}", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + body = r3.json() + if body["job"]["status"] in ("succeeded", "failed", "timed_out", "cancelled"): + assert body["job"]["status"] == "failed" + assert body["job"]["exit_code"] == 7 + return + time.sleep(0.2) + raise AssertionError("job never finished") diff --git a/tests/test_jobs_api.py b/tests/test_jobs_api.py new file mode 100644 index 0000000..74e2c3a --- /dev/null +++ b/tests/test_jobs_api.py @@ -0,0 +1,248 @@ +"""HTTP-level tests for /projects/{name}/jobs and /jobs surfaces. + +These tests stub out the runner so we don't actually clone/run subprocesses +inside the API tests — see test_runner.py for the exec-side coverage. +""" +from __future__ import annotations + +import time + +import pytest + +from tests.conftest import sample_project_payload + + +def _stub_runner(server): + """Replace runner.enqueue with a no-op so we can exercise the API surface + without driving the dispatcher.""" + server.runner.enqueue = _aiono_op # type: ignore[assignment] + + +async def _aiono_op(*_a, **_k): + return None + + +def test_create_job_returns_id(client): + tc, ctx = client + _stub_runner(ctx["server"]) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="job-proj"), + ) + r = tc.post( + "/projects/job-proj/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "audit"}, + ) + assert r.status_code == 200, r.text + body = r.json() + assert body["ok"] is True + assert isinstance(body["job_id"], str) and len(body["job_id"]) >= 16 + assert body["status"] == "queued" + + +def test_create_job_invalid_recipe(client): + tc, ctx = client + _stub_runner(ctx["server"]) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="invrec"), + ) + r = tc.post( + "/projects/invrec/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "not-a-real-recipe"}, + ) + # Pydantic Literal validation -> 422 + assert r.status_code in (400, 422) + + +def test_create_job_subproject_without_recipe_command(client): + tc, ctx = client + _stub_runner(ctx["server"]) + payload = sample_project_payload(name="missing-cmd") + # Drop the audit command + payload["subprojects"][0]["audit"] = None + payload["subprojects"][0]["build"] = None + payload["subprojects"][0]["test"] = None + payload["subprojects"][0]["lint"] = "echo lint" + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=payload, + ) + r = tc.post( + "/projects/missing-cmd/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "audit"}, + ) + assert r.status_code == 400 + + +def test_list_jobs_filters_by_project_and_token(client): + tc, ctx = client + _stub_runner(ctx["server"]) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="pa"), + ) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['bravo_bearer']}"}, + json=sample_project_payload(name="pb"), + ) + tc.post( + "/projects/pa/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "test"}, + ) + tc.post( + "/projects/pb/jobs", + headers={"Authorization": f"Bearer {ctx['bravo_bearer']}"}, + json={"recipe": "test"}, + ) + + r = tc.get( + "/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r.status_code == 200 + jobs = r.json()["jobs"] + assert all(j["project_name"] == "pa" for j in jobs) + + +def test_list_jobs_filter_status(client): + tc, ctx = client + _stub_runner(ctx["server"]) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="ps"), + ) + tc.post( + "/projects/ps/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "test"}, + ) + r = tc.get( + "/jobs?status=queued", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r.status_code == 200 + assert all(j["status"] == "queued" for j in r.json()["jobs"]) + r2 = tc.get( + "/jobs?status=succeeded", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r2.json()["jobs"] == [] + + +def test_get_job_includes_log_tail(client, tmp_workspace): + tc, ctx = client + _stub_runner(ctx["server"]) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="logged"), + ) + r = tc.post( + "/projects/logged/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "test"}, + ) + job_id = r.json()["job_id"] + + # Plant a fake log file + log_path = tmp_workspace["log_dir"] / f"{job_id}.log" + log_path.write_text("\n".join(f"line-{i}" for i in range(300)) + "\n") + + r2 = tc.get( + f"/jobs/{job_id}", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r2.status_code == 200 + body = r2.json() + assert "log_tail" in body + assert len(body["log_tail"]) == 200 + assert body["log_tail"][-1] == "line-299" + + +def test_get_job_other_token_404(client): + tc, ctx = client + _stub_runner(ctx["server"]) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="cross-job"), + ) + r = tc.post( + "/projects/cross-job/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "test"}, + ) + job_id = r.json()["job_id"] + r2 = tc.get( + f"/jobs/{job_id}", + headers={"Authorization": f"Bearer {ctx['bravo_bearer']}"}, + ) + assert r2.status_code == 404 + + +def test_get_findings_empty_in_wave1(client): + tc, ctx = client + _stub_runner(ctx["server"]) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="findings-test"), + ) + r = tc.post( + "/projects/findings-test/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "audit"}, + ) + job_id = r.json()["job_id"] + r2 = tc.get( + f"/jobs/{job_id}/findings", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r2.status_code == 200 + assert r2.json()["findings"] == [] + + +def test_get_log_streams_file(client, tmp_workspace): + tc, ctx = client + _stub_runner(ctx["server"]) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="logstream"), + ) + r = tc.post( + "/projects/logstream/jobs", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json={"recipe": "test"}, + ) + job_id = r.json()["job_id"] + log_path = tmp_workspace["log_dir"] / f"{job_id}.log" + log_path.write_text("hello world\nmore lines\n") + + r2 = tc.get( + f"/jobs/{job_id}/log", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r2.status_code == 200 + assert "hello world" in r2.text + + +def test_healthz_open_to_lan(client): + tc, _ = client + r = tc.get("/healthz") + assert r.status_code == 200 + body = r.json() + assert body["ok"] is True + assert body["db"] == "ok" + assert "runner" in body and "max" in body["runner"] diff --git a/tests/test_projects_api.py b/tests/test_projects_api.py new file mode 100644 index 0000000..ecf5c3e --- /dev/null +++ b/tests/test_projects_api.py @@ -0,0 +1,235 @@ +"""HTTP-level tests for the /projects surface.""" +from __future__ import annotations + +import pytest + +from tests.conftest import sample_project_payload + + +def test_register_project_returns_owner(client): + tc, ctx = client + r = tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="proj-alpha"), + ) + assert r.status_code == 200, r.text + body = r.json() + assert body["ok"] is True + assert body["project"]["name"] == "proj-alpha" + + +def test_register_requires_auth(client): + tc, _ = client + r = tc.post("/projects", json=sample_project_payload()) + assert r.status_code == 401 + + +def test_register_rejects_unknown_token(client): + tc, _ = client + r = tc.post( + "/projects", + headers={"Authorization": "Bearer ct_definitely_not_real"}, + json=sample_project_payload(), + ) + assert r.status_code == 403 + + +def test_list_projects_filters_by_token(client): + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="alpha-1"), + ) + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['bravo_bearer']}"}, + json=sample_project_payload(name="bravo-1"), + ) + r_a = tc.get("/projects", headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}) + assert r_a.status_code == 200 + names = {p["name"] for p in r_a.json()["projects"]} + assert names == {"alpha-1"} + + # Admin sees both + r_admin = tc.get("/projects", headers={"Authorization": f"Bearer {ctx['admin_bearer']}"}) + admin_names = {p["name"] for p in r_admin.json()["projects"]} + assert admin_names == {"alpha-1", "bravo-1"} + + +def test_get_project_404_for_other_token(client): + """Existence-leak guard: bravo querying alpha's project gets 404 (not 403).""" + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="secret-proj"), + ) + r = tc.get( + "/projects/secret-proj", + headers={"Authorization": f"Bearer {ctx['bravo_bearer']}"}, + ) + assert r.status_code == 404 + + +def test_get_project_own(client): + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="own-proj"), + ) + r = tc.get( + "/projects/own-proj", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r.status_code == 200 + assert r.json()["project"]["name"] == "own-proj" + + +def test_update_project_owner_can(client): + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="upd-proj"), + ) + payload = sample_project_payload(name="upd-proj") + payload["git_url"] = "https://changed.example/repo.git" + r = tc.put( + "/projects/upd-proj", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=payload, + ) + assert r.status_code == 200 + assert r.json()["project"]["git_url"] == "https://changed.example/repo.git" + + +def test_update_project_other_token_404(client): + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="no-touch"), + ) + payload = sample_project_payload(name="no-touch") + payload["git_url"] = "https://hijack.example/x.git" + r = tc.put( + "/projects/no-touch", + headers={"Authorization": f"Bearer {ctx['bravo_bearer']}"}, + json=payload, + ) + assert r.status_code == 404 + + +def test_delete_project_owner_can(client): + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="del-proj"), + ) + r = tc.delete( + "/projects/del-proj", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r.status_code == 200 + # Confirm gone + r2 = tc.get( + "/projects/del-proj", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + ) + assert r2.status_code == 404 + + +def test_delete_project_other_token_404(client): + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="hands-off"), + ) + r = tc.delete( + "/projects/hands-off", + headers={"Authorization": f"Bearer {ctx['bravo_bearer']}"}, + ) + assert r.status_code == 404 + + +def test_admin_can_modify_any_project(client): + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="adminable"), + ) + payload = sample_project_payload(name="adminable") + payload["git_url"] = "https://admin-edit.example/x.git" + r = tc.put( + "/projects/adminable", + headers={"Authorization": f"Bearer {ctx['admin_bearer']}"}, + json=payload, + ) + assert r.status_code == 200 + assert r.json()["project"]["git_url"] == "https://admin-edit.example/x.git" + + +def test_register_duplicate_409_for_owner(client): + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="dup"), + ) + r = tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="dup"), + ) + assert r.status_code == 409 + + +def test_register_duplicate_404_for_other(client): + """Other-token re-registering an existing name gets 404 (existence leak guard).""" + tc, ctx = client + tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}, + json=sample_project_payload(name="hidden"), + ) + r = tc.post( + "/projects", + headers={"Authorization": f"Bearer {ctx['bravo_bearer']}"}, + json=sample_project_payload(name="hidden"), + ) + assert r.status_code == 404 + + +def test_admin_token_endpoint_admin_only(client): + tc, ctx = client + r = tc.get("/admin/tokens", headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"}) + assert r.status_code == 403 + r2 = tc.get("/admin/tokens", headers={"Authorization": f"Bearer {ctx['admin_bearer']}"}) + assert r2.status_code == 200 + + +def test_admin_revoke_token(client): + tc, ctx = client + r = tc.delete( + f"/admin/tokens/{ctx['bravo_name']}", + headers={"Authorization": f"Bearer {ctx['admin_bearer']}"}, + ) + assert r.status_code == 200 + # Bravo's token should now fail + r2 = tc.get("/projects", headers={"Authorization": f"Bearer {ctx['bravo_bearer']}"}) + assert r2.status_code == 403 + + +def test_admin_cannot_revoke_admin(client): + tc, ctx = client + r = tc.delete( + "/admin/tokens/admin", + headers={"Authorization": f"Bearer {ctx['admin_bearer']}"}, + ) + assert r.status_code == 400 diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100644 index 0000000..ea3632e --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,232 @@ +"""Runner — exec recipe, timeout, bounded concurrency, orphan recovery. + +Strategy: bypass the workspace materialization step (no real git URLs in +tests) by stubbing WorkspaceManager.materialize to return a path the test +controls. Recipes are stub shell strings (`echo`, `sleep`, `false`). +""" +from __future__ import annotations + +import asyncio +import json +import time +from pathlib import Path + +import pytest + +from crafting_table.db import DB +from crafting_table.runner import Runner +from crafting_table.workspace import WorkspaceManager, WorkspacePaths + + +# ---------- helper: stub materialize() so we don't need real git ---------- + + +class _StubWorkspace(WorkspaceManager): + def __init__(self, root: Path): + super().__init__(root) + self._cleanups: list[Path] = [] + + async def materialize(self, *, project, job_id, git_url, branch, log_fh): + # Echo what would happen, write to log, return a path inside root. + worktree = self.root / project / job_id + worktree.mkdir(parents=True, exist_ok=True) + log_fh.write(f"[stub-workspace] would clone {git_url}@{branch} to {worktree}\n") + log_fh.flush() + return WorkspacePaths( + project_root=self.root / project, + cache_dir=self.root / project / ".cache", + worktree_dir=worktree, + ) + + async def cleanup(self, paths): + self._cleanups.append(paths.worktree_dir) + + +def _seed_project_and_job( + db: DB, *, recipe_cmd: str, timeout_secs: int = 5, recipe_kind: str = "test" +) -> str: + if not db.get_token("o"): + db.insert_token(name="o", bearer="t-runner", is_admin=False, ip_cidrs=None) + sub = { + "path": ".", + "language": "python", + "build": None, "test": None, "lint": None, "audit": None, + "timeout_secs": timeout_secs, + } + sub[recipe_kind] = recipe_cmd + snapshot = { + "git_url": "stub://localhost", + "default_branch": "main", + "subprojects": [sub], + "languages": ["python"], + } + db.upsert_project( + name="proj", + git_url="stub://localhost", + default_branch="main", + recipe_json=json.dumps(snapshot), + owner_token="o", + ) + job_id = f"job-{int(time.time()*1000)}-{recipe_kind}" + log_path = Path(db.db_path).parent / "jobs" / f"{job_id}.log" + log_path.parent.mkdir(parents=True, exist_ok=True) + db.insert_job( + job_id=job_id, project_name="proj", subproject_path=".", + recipe=recipe_kind, branch="main", + log_path=str(log_path), recipe_snapshot_json=json.dumps(snapshot), + ) + return job_id + + +# ---------- tests --------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_run_succeeds(tmp_path): + db = DB(str(tmp_path / "ct.db")) + ws = _StubWorkspace(tmp_path / "ws") + runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) + await runner.start() + try: + job_id = _seed_project_and_job(db, recipe_cmd="echo hello && exit 0", recipe_kind="test") + await runner.enqueue(job_id) + await _wait_terminal(db, job_id) + j = db.get_job(job_id) + assert j["status"] == "succeeded" + assert j["exit_code"] == 0 + finally: + await runner.stop() + + +@pytest.mark.asyncio +async def test_run_fails(tmp_path): + db = DB(str(tmp_path / "ct.db")) + ws = _StubWorkspace(tmp_path / "ws") + runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) + await runner.start() + try: + job_id = _seed_project_and_job(db, recipe_cmd="exit 1", recipe_kind="test") + await runner.enqueue(job_id) + await _wait_terminal(db, job_id) + j = db.get_job(job_id) + assert j["status"] == "failed" + assert j["exit_code"] == 1 + finally: + await runner.stop() + + +@pytest.mark.asyncio +async def test_run_times_out(tmp_path): + db = DB(str(tmp_path / "ct.db")) + ws = _StubWorkspace(tmp_path / "ws") + runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) + await runner.start() + try: + job_id = _seed_project_and_job( + db, recipe_cmd="sleep 60", timeout_secs=1, recipe_kind="audit" + ) + await runner.enqueue(job_id) + await _wait_terminal(db, job_id, deadline_s=15) + j = db.get_job(job_id) + assert j["status"] == "timed_out" + log_text = Path(j["log_path"]).read_text() + assert "timeout" in log_text + finally: + await runner.stop() + + +@pytest.mark.asyncio +async def test_bounded_concurrency(tmp_path): + """Queue 5 jobs with max_concurrent=2, assert peak in-flight stays at 2.""" + db = DB(str(tmp_path / "ct.db")) + ws = _StubWorkspace(tmp_path / "ws") + runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) + await runner.start() + try: + ids = [] + for _ in range(5): + job_id = _seed_project_and_job( + db, recipe_cmd="sleep 0.5 && echo ok", timeout_secs=10, recipe_kind="test" + ) + ids.append(job_id) + for j in ids: + await runner.enqueue(j) + for j in ids: + await _wait_terminal(db, j, deadline_s=30) + assert runner.peak_in_flight <= 2 + assert runner.peak_in_flight >= 1 # at least one concurrent run happened + for j in ids: + assert db.get_job(j)["status"] == "succeeded" + finally: + await runner.stop() + + +@pytest.mark.asyncio +async def test_orphaned_running_marked_failed_on_start(tmp_path): + """A row left in 'running' from a previous boot should be flipped to failed.""" + db = DB(str(tmp_path / "ct.db")) + db.insert_token(name="o", bearer="t-orph", is_admin=False, ip_cidrs=None) + db.upsert_project( + name="proj", git_url="stub://x", default_branch="main", + recipe_json="{}", owner_token="o", + ) + db.insert_job( + job_id="orph-1", project_name="proj", subproject_path=".", + recipe="test", branch="main", + log_path=str(tmp_path / "orph.log"), recipe_snapshot_json="{}", + ) + db.mark_job_running("orph-1") + assert db.get_job("orph-1")["status"] == "running" + + ws = _StubWorkspace(tmp_path / "ws") + runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) + await runner.start() + try: + # start() should have flipped orph-1 to failed + j = db.get_job("orph-1") + assert j["status"] == "failed" + assert j["exit_code"] == -1 + finally: + await runner.stop() + + +@pytest.mark.asyncio +async def test_jobs_finished_hook_fires(tmp_path): + db = DB(str(tmp_path / "ct.db")) + ws = _StubWorkspace(tmp_path / "ws") + runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) + + seen: list[dict] = [] + + async def cap(event): + seen.append(event) + + runner.add_hook(cap) + await runner.start() + try: + job_id = _seed_project_and_job(db, recipe_cmd="echo h && true", recipe_kind="test") + await runner.enqueue(job_id) + await _wait_terminal(db, job_id) + # Hook fires AFTER mark_job_finished so we have to give the loop a tick. + for _ in range(50): + if seen: + break + await asyncio.sleep(0.05) + assert len(seen) == 1 + assert seen[0]["job_id"] == job_id + assert seen[0]["status"] == "succeeded" + finally: + await runner.stop() + + +# ---------- helpers ------------------------------------------------------- + + +async def _wait_terminal(db: DB, job_id: str, *, deadline_s: float = 15.0) -> None: + deadline = time.monotonic() + deadline_s + while time.monotonic() < deadline: + j = db.get_job(job_id) + if j and j["status"] in ("succeeded", "failed", "timed_out", "cancelled"): + return + await asyncio.sleep(0.1) + raise AssertionError(f"job {job_id} did not reach terminal state within {deadline_s}s")