"""FastAPI app — port 8810. The HTTP surface for crafting-table. Authentication model: - Every request needs `Authorization: Bearer `. - The bearer is hashed and looked up in the tokens table. - Tokens are flagged is_admin=1 or 0. Admin can do everything. - Per-app tokens (is_admin=0) can register projects (becoming the owner) and only see/touch projects where owner_token matches their name. - Cross-token project access returns 404 (NOT 403) — existence-leak guard. Endpoints: - GET /healthz — public-ish (still needs LAN IP) - POST /admin/tokens — admin only - GET /admin/tokens — admin only - DELETE /admin/tokens/{name} — admin only - POST /projects — any token (becomes owner) - GET /projects — caller's projects (or all if admin) - GET /projects/{name} — visibility-gated, 404 on cross-token - PUT /projects/{name} — owner or admin only - DELETE /projects/{name} — owner or admin only; cascades jobs+findings - POST /projects/{name}/jobs — visibility-gated; enqueues a job - GET /jobs — caller's jobs (or all if admin) - GET /jobs/{id} — owner or admin; returns last 200 log lines - GET /jobs/{id}/log — owner or admin; full log file stream - GET /jobs/{id}/findings — owner or admin; empty list in wave 1 """ from __future__ import annotations import asyncio import json import logging import time import uuid from contextlib import asynccontextmanager from pathlib import Path from typing import Annotated from fastapi import FastAPI, Header, HTTPException, Request from fastapi.responses import FileResponse, JSONResponse from .auth import Auth, AppToken from .config import Config, load from .db import DB from .digest import DigestScheduler, SmtpConfig from .models import ( CreateJobRequest, Project, TokenCreateRequest, ) from .patcher import Patcher, PatcherConfig from .runner import Runner from .workspace import WorkspaceManager log = logging.getLogger("crafting_table") if not log.handlers: logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") # ---------- module-level singletons (rebuilt per test via fixture) ---------- cfg: Config = load() db: DB = DB(cfg.db_path) auth: Auth = Auth(db=db, lan_cidrs=cfg.lan_cidrs) workspace: WorkspaceManager = WorkspaceManager(cfg.workspace_root) runner: Runner = Runner( db=db, workspace=workspace, log_dir=cfg.log_dir, max_concurrent=cfg.max_concurrent_jobs, default_timeout_secs=cfg.default_job_timeout_secs, gc_interval_secs=cfg.workspace_gc_interval_secs, gc_age_secs=cfg.workspace_gc_age_secs, ) # SMTP-driven; if CRAFTING_SMTP_HOST is unset the scheduler stays None and the # lifespan logs a "digest disabled" warning. Endpoints still render in dry-run. _smtp_cfg: SmtpConfig | None = SmtpConfig.from_env() digest_scheduler: DigestScheduler = DigestScheduler(db=db, smtp=_smtp_cfg) # Patcher (wave 3): clawdforge + Gitea creds env-driven; if any required env # var is missing, the patcher stays None and the runner hook short-circuits. _patcher_cfg: PatcherConfig | None = PatcherConfig.from_env() patcher: Patcher | None = ( Patcher(db=db, workspace=workspace, config=_patcher_cfg, runner=runner) if _patcher_cfg is not None else None ) # Wire the patcher into the runner's post-job hook. The runner already runs # the parser pipeline before this hook fires, so by the time we land here # the findings rows for `job_id` are committed and pickable. async def _maybe_auto_patch_hook(event: dict) -> None: if patcher is None: return if event.get("findings_count", 0) <= 0: return project_row = await db.arun(db.get_project, event["project_name"]) if project_row is None: return try: recipe = json.loads(project_row.get("recipe_json") or "{}") except json.JSONDecodeError: return notify = recipe.get("notify") or {} if not bool(notify.get("auto_patch")): return job = await db.arun(db.get_job, event["job_id"]) if job is None: return try: await patcher.maybe_draft_for_job(job) except Exception as e: log.warning("patcher hook failed for job %s: %s", event["job_id"], e) runner.add_hook(_maybe_auto_patch_hook) # ---------- lifespan -------------------------------------------------------- @asynccontextmanager async def _lifespan(app: FastAPI): auth.bootstrap_admin(cfg.admin_bearer_path) await runner.start() if _smtp_cfg is None: log.warning("digest disabled — CRAFTING_SMTP_HOST not set") else: await digest_scheduler.start() log.info( "digest scheduler started: smtp=%s:%d from=%s", _smtp_cfg.host, _smtp_cfg.port, _smtp_cfg.from_addr, ) log.info( "crafting-table startup: db=%s log_dir=%s max_concurrent=%d port=%d", cfg.db_path, cfg.log_dir, cfg.max_concurrent_jobs, cfg.api_port, ) try: yield finally: await digest_scheduler.stop() await runner.stop() log.info("crafting-table shutdown complete") app = FastAPI(title="crafting-table", version="0.1.0", lifespan=_lifespan) # ---------- helpers --------------------------------------------------------- def _project_visible(project_row: dict | None, token: AppToken) -> dict: """Return the project row if visible to this token, else raise 404. Existence-leak guard: cross-token access yields the same 404 a missing project would. """ if project_row is None: raise HTTPException(404, "project not found") if token.is_admin: return project_row if project_row["owner_token"] == token.name: return project_row raise HTTPException(404, "project not found") def _job_visible(job_row: dict | None, token: AppToken) -> dict: if job_row is None: raise HTTPException(404, "job not found") if token.is_admin: return job_row project_row = db.get_project(job_row["project_name"]) if project_row is None or project_row["owner_token"] != token.name: raise HTTPException(404, "job not found") return job_row def _project_to_api(row: dict) -> dict: """Inflate a DB row + recipe_json into the API-shaped Project dict.""" recipe = json.loads(row["recipe_json"]) recipe["name"] = row["name"] recipe["git_url"] = row["git_url"] recipe["default_branch"] = row["default_branch"] recipe["created_at"] = row["created_at"] recipe["updated_at"] = row["updated_at"] return recipe def _project_recipe_blob(p: Project) -> str: """Serialize the parts of Project we store inside recipe_json (omit the fields that get their own columns: name, git_url, default_branch, created_at, updated_at).""" return json.dumps({ "languages": p.languages, "subprojects": [s.model_dump() for s in p.subprojects], "schedule": p.schedule.model_dump(), "notify": p.notify.model_dump(), }) # ---------- endpoints ------------------------------------------------------- @app.get("/healthz") async def healthz(request: Request): auth.require_global_ip(request) # Cheap liveness — DB query that exercises the connection. try: await db.arun(db.applied_migrations) db_ok = True except Exception as e: db_ok = False log.warning("healthz db check failed: %s", e) return { "ok": True, "db": "ok" if db_ok else "fail", "runner": runner.stats(), "version": "0.1.0", } # ---- /admin/tokens --------------------------------------------------------- @app.post("/admin/tokens") async def admin_create_token( request: Request, body: TokenCreateRequest, authorization: Annotated[str | None, Header()] = None, ): auth.require_admin(request, authorization) import secrets as _s bearer = ("ct_" if not body.is_admin else "ctadmin_") + _s.token_urlsafe(32) try: await db.arun( db.insert_token, name=body.name, bearer=bearer, is_admin=body.is_admin, ip_cidrs=body.ip_cidrs or None, ) except Exception as e: # UNIQUE-violation, etc. Don't leak DB internals. raise HTTPException(409, f"token create failed: {type(e).__name__}") return { "ok": True, "name": body.name, "bearer": bearer, "is_admin": body.is_admin, "ip_cidrs": body.ip_cidrs, } @app.get("/admin/tokens") async def admin_list_tokens( request: Request, authorization: Annotated[str | None, Header()] = None, ): auth.require_admin(request, authorization) rows = await db.arun(db.list_tokens) return {"ok": True, "tokens": rows} @app.delete("/admin/tokens/{name}") async def admin_revoke_token( name: str, request: Request, authorization: Annotated[str | None, Header()] = None, ): auth.require_admin(request, authorization) if name == "admin": raise HTTPException(400, "cannot revoke the admin token via API") revoked = await db.arun(db.revoke_token, name) if not revoked: raise HTTPException(404, "token not found or already revoked") return {"ok": True} # ---- /projects ------------------------------------------------------------- @app.post("/projects") async def register_project( request: Request, body: Project, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) existing = await db.arun(db.get_project, body.name) if existing is not None: # Cross-token registration of the same name is treated as a 409 even # for admin — admin who wants to take over should DELETE then re-POST, # or PUT. if not tok.is_admin and existing["owner_token"] != tok.name: # 404, not 409 — don't leak that the name is taken under a # different token. raise HTTPException(404, "project not found") raise HTTPException(409, "project already exists; use PUT to update") row = await db.arun( db.upsert_project, name=body.name, git_url=body.git_url, default_branch=body.default_branch, recipe_json=_project_recipe_blob(body), owner_token=tok.name, ) return {"ok": True, "project": _project_to_api(row)} @app.put("/projects/{name}") async def update_project( name: str, request: Request, body: Project, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) existing = await db.arun(db.get_project, name) _project_visible(existing, tok) if body.name != name: raise HTTPException(400, "name in body must match path") row = await db.arun( db.upsert_project, name=name, git_url=body.git_url, default_branch=body.default_branch, recipe_json=_project_recipe_blob(body), owner_token=existing["owner_token"], ) return {"ok": True, "project": _project_to_api(row)} @app.delete("/projects/{name}") async def delete_project( name: str, request: Request, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) existing = await db.arun(db.get_project, name) _project_visible(existing, tok) deleted = await db.arun(db.delete_project, name) if not deleted: raise HTTPException(404, "project not found") return {"ok": True} @app.get("/projects") async def list_projects( request: Request, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) owner = None if tok.is_admin else tok.name rows = await db.arun(db.list_projects, owner_token=owner) return {"ok": True, "projects": [_project_to_api(r) for r in rows]} @app.get("/projects/{name}") async def get_project( name: str, request: Request, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) row = await db.arun(db.get_project, name) _project_visible(row, tok) return {"ok": True, "project": _project_to_api(row)} # ---- /projects/{name}/jobs ------------------------------------------------- @app.post("/projects/{name}/jobs") async def create_job( name: str, request: Request, body: CreateJobRequest, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) project_row = await db.arun(db.get_project, name) _project_visible(project_row, tok) recipe = json.loads(project_row["recipe_json"]) subprojects = recipe.get("subprojects", []) if not subprojects: raise HTTPException(400, "project has no subprojects") # Pick the right subproject: # - explicit body.subproject takes the matching path entry # - otherwise pick the first subproject that has a non-empty command for # the requested recipe kind chosen = None if body.subproject is not None: for s in subprojects: if s.get("path") == body.subproject: chosen = s break if chosen is None: raise HTTPException(400, f"subproject '{body.subproject}' not found in project") else: for s in subprojects: if s.get(body.recipe): chosen = s break if chosen is None: raise HTTPException(400, f"no subproject defines a '{body.recipe}' command") if not chosen.get(body.recipe): raise HTTPException(400, f"subproject '{chosen.get('path', '.')}' has no '{body.recipe}' command") job_id = str(uuid.uuid4()) log_path = str(Path(cfg.log_dir) / f"{job_id}.log") branch = body.branch or project_row["default_branch"] # Snapshot the recipe at run-time. Future recipe edits don't retcon this # job's view of what command should run — every job carries its own # frozen copy. snapshot = { "git_url": project_row["git_url"], "default_branch": project_row["default_branch"], "subprojects": subprojects, "languages": recipe.get("languages", []), } row = await db.arun( db.insert_job, job_id=job_id, project_name=name, subproject_path=chosen.get("path", "."), recipe=body.recipe, branch=branch, log_path=log_path, recipe_snapshot_json=json.dumps(snapshot), ) await runner.enqueue(job_id) return {"ok": True, "job_id": job_id, "status": "queued", "job": row} # ---- /jobs ----------------------------------------------------------------- @app.get("/jobs") async def list_jobs( request: Request, authorization: Annotated[str | None, Header()] = None, project: str | None = None, status: str | None = None, limit: int = 50, ): tok = auth.require_app(request, authorization) owner = None if tok.is_admin else tok.name rows = await db.arun( db.list_jobs, project_name=project, status=status, owner_token=owner, limit=max(1, min(limit, 500)), ) return {"ok": True, "jobs": rows} @app.get("/jobs/{id}") async def get_job( id: str, request: Request, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) row = await db.arun(db.get_job, id) _job_visible(row, tok) log_tail: list[str] = [] log_path = Path(row["log_path"]) if log_path.exists(): try: # Tail at most 200 lines without reading whole file into memory. log_tail = _tail_lines(log_path, 200) except Exception as e: log.warning("log tail failed for %s: %s", row["log_path"], e) return {"ok": True, "job": row, "log_tail": log_tail} @app.get("/jobs/{id}/log") async def get_job_log( id: str, request: Request, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) row = await db.arun(db.get_job, id) _job_visible(row, tok) log_path = Path(row["log_path"]) if not log_path.exists(): raise HTTPException(404, "log file not present") return FileResponse(str(log_path), media_type="text/plain", filename=f"{id}.log") @app.get("/jobs/{id}/findings") async def get_job_findings( id: str, request: Request, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) row = await db.arun(db.get_job, id) _job_visible(row, tok) findings = await db.arun(db.list_findings, id) return {"ok": True, "findings": findings} # ---- /patches -------------------------------------------------------------- @app.post("/jobs/{id}/patches") async def trigger_patch( id: str, request: Request, authorization: Annotated[str | None, Header()] = None, body: dict | None = None, ): """Manually trigger a patch attempt against a job. body: {"finding_id": int | null}. If finding_id is null/absent we pick the highest-severity actionable finding on the job. Returns the resulting PatchAttempt as a dict. 503 if the patcher is not configured (CRAFTING_CLAWDFORGE_URL/TOKEN/GITEA_URL/TOKEN missing). """ tok = auth.require_app(request, authorization) job_row = await db.arun(db.get_job, id) _job_visible(job_row, tok) if patcher is None: raise HTTPException(503, "patcher not configured") body = body or {} finding_id = body.get("finding_id") if finding_id is not None and not isinstance(finding_id, int): raise HTTPException(400, "finding_id must be an integer or null") try: attempt = await patcher.maybe_draft(id, finding_id=finding_id) except Exception as e: log.exception("patch trigger failed: %s", e) raise HTTPException(500, f"patch attempt errored: {type(e).__name__}") if attempt is None: return {"ok": True, "attempt": None, "reason": "no_actionable_finding"} return {"ok": True, "attempt": _patch_attempt_to_api(attempt)} @app.get("/patches") async def list_patches( request: Request, authorization: Annotated[str | None, Header()] = None, project: str | None = None, status: str | None = None, limit: int = 100, ): tok = auth.require_app(request, authorization) owner = None if tok.is_admin else tok.name rows = await db.arun( db.list_patch_attempts, project_name=project, status=status, owner_token=owner, limit=max(1, min(limit, 500)), ) return {"ok": True, "patches": rows} @app.get("/patches/{id}") async def get_patch( id: int, request: Request, authorization: Annotated[str | None, Header()] = None, ): tok = auth.require_app(request, authorization) row = await db.arun(db.get_patch_attempt, int(id)) if row is None: raise HTTPException(404, "patch attempt not found") # Visibility-gate via the underlying project. project_row = await db.arun(db.get_project, row["project_name"]) if project_row is None: raise HTTPException(404, "patch attempt not found") if not tok.is_admin and project_row["owner_token"] != tok.name: raise HTTPException(404, "patch attempt not found") return {"ok": True, "patch": row} def _patch_attempt_to_api(attempt) -> dict: """Serialize a PatchAttempt dataclass to the wire shape.""" return { "id": attempt.id, "finding_id": attempt.finding_id, "job_id": attempt.job_id, "project_name": attempt.project_name, "attempt_number": attempt.attempt_number, "status": attempt.status, "branch_name": attempt.branch_name, "pr_url": attempt.pr_url, "diff_excerpt": attempt.diff_excerpt, "session_id": attempt.session_id, "error": attempt.error, } # ---- /digests -------------------------------------------------------------- @app.get("/digests/{date}") async def get_digest( date: str, request: Request, authorization: Annotated[str | None, Header()] = None, ): """Render the digest body for a given date (YYYY-MM-DD in PT) without sending. Admin token required. Wave 2C: this is a dry-run rendering pass so the email's "Full log:" link resolves; future waves can persist rendered bodies to a blob store and pull them here for archive replay. """ auth.require_admin(request, authorization) # basic shape validation if len(date) != 10 or date[4] != "-" or date[7] != "-": raise HTTPException(400, "date must be YYYY-MM-DD") summary = await digest_scheduler.run_once(target_date=date, dry_run=True) return {"ok": True, "digest": summary} @app.post("/admin/digest/run-now") async def admin_digest_run_now( request: Request, authorization: Annotated[str | None, Header()] = None, body: dict | None = None, ): """Manual digest trigger. Admin only. body={"dry_run": true} skips SMTP.""" auth.require_admin(request, authorization) body = body or {} dry_run = bool(body.get("dry_run", False)) target_date = body.get("date") summary = await digest_scheduler.run_once(target_date=target_date, dry_run=dry_run) return {"ok": True, "digest": summary} # ---------- helpers --------------------------------------------------------- def _tail_lines(path: Path, n: int) -> list[str]: """Read the last n lines of a file without slurping the whole thing. Implementation: seek backwards in chunks, splitting on \\n. Good enough for log files in the MB range; if a single line is huge (rare) we'll read more than the strict minimum. """ BLOCK = 4096 with path.open("rb") as fh: fh.seek(0, 2) size = fh.tell() data = b"" while size > 0 and data.count(b"\n") <= n: read = min(BLOCK, size) size -= read fh.seek(size) data = fh.read(read) + data text = data.decode("utf-8", "replace") lines = text.splitlines() return lines[-n:]