"""Bearer + IP allowlist authentication. - Bearer tokens hashed at rest (SHA-256). No plaintext stored. - Per-token IP allowlist (CIDR list). NULL means "any RFC1918 + loopback" via the global LAN allowlist. - Admin tokens are flagged in the tokens table — server-side admin checks query `is_admin` rather than comparing to a bootstrap string. - Loopback always allowed (test client uses 127.0.0.1; FastAPI's `request.client.host` returns 'testclient' under TestClient and we patch that in tests). - Bearer tokens NEVER appear in error messages or log lines. """ from __future__ import annotations import ipaddress import logging import secrets from dataclasses import dataclass from pathlib import Path from fastapi import HTTPException, Request from .db import DB log = logging.getLogger("crafting_table.auth") ADMIN_TOKEN_NAME = "admin" ADMIN_TOKEN_PREFIX = "ct_" @dataclass class AppToken: name: str is_admin: bool ip_cidrs: list[str] | None # None = use global LAN allowlist def _client_ip(request: Request) -> str: """Extract the client IP from a request. Tests monkeypatch this.""" return request.client.host if request.client else "0.0.0.0" def _ip_in_any(ip_str: str, cidrs: list[str]) -> bool: try: ip = ipaddress.ip_address(ip_str) except ValueError: return False if ip.is_loopback: return True for cidr in cidrs: try: if ip in ipaddress.ip_network(cidr, strict=False): return True except ValueError: continue return False def _const_eq(a: str, b: str) -> bool: if len(a) != len(b): return False diff = 0 for x, y in zip(a.encode(), b.encode()): diff |= x ^ y return diff == 0 class Auth: """Holds DB ref + global LAN CIDRs. Construct once at startup.""" def __init__(self, *, db: DB, lan_cidrs: list[str] | tuple[str, ...]): self.db = db self.lan_cidrs = list(lan_cidrs) # ---------- bootstrap --------------------------------------------------- def bootstrap_admin(self, admin_bearer_path: Path) -> str: """Mint admin token if none exists, write plaintext bearer to disk (chmod 600). Subsequent boots reuse the existing token. Returns the path-side bearer (read from disk) — not necessarily what we just minted, since another process may have raced us. """ admin_bearer_path = Path(admin_bearer_path) admin_bearer_path.parent.mkdir(parents=True, exist_ok=True) existing = self.db.get_token(ADMIN_TOKEN_NAME) if existing is not None and admin_bearer_path.exists(): return admin_bearer_path.read_text(encoding="utf-8").strip() if existing is not None: # Token row exists but the file is gone — we cannot recover the # plaintext (it was hashed at insert). Revoke and re-mint. log.warning("admin token row exists but bearer file is missing; rotating") self.db.revoke_token(ADMIN_TOKEN_NAME) # Renaming the existing row would be cleaner, but revoke + new # row keeps the audit trail of past admin tokens. new_name = f"{ADMIN_TOKEN_NAME}-rotated-{int(__import__('time').time())}" with self.db._conn() as c: c.execute("UPDATE tokens SET name=? WHERE name=?", (new_name, ADMIN_TOKEN_NAME)) bearer = ADMIN_TOKEN_PREFIX + secrets.token_urlsafe(32) self.db.insert_token( name=ADMIN_TOKEN_NAME, bearer=bearer, is_admin=True, ip_cidrs=None, ) admin_bearer_path.write_text(bearer + "\n", encoding="utf-8") admin_bearer_path.chmod(0o600) log.info("admin bearer written to %s (chmod 600)", admin_bearer_path) return bearer # ---------- guards ------------------------------------------------------ def require_global_ip(self, request: Request) -> None: ip = _client_ip(request) if not _ip_in_any(ip, self.lan_cidrs): raise HTTPException(403, f"ip not in LAN allowlist: {ip}") def require_app(self, request: Request, authorization: str | None) -> AppToken: """Returns AppToken on success. Raises 401/403 on failure. We check the global LAN allowlist FIRST (cheap, doesn't touch DB) so wide-area scanners don't even cause a token lookup. """ self.require_global_ip(request) if not authorization or not authorization.startswith("Bearer "): raise HTTPException(401, "missing bearer") bearer = authorization[7:].strip() if not bearer: raise HTTPException(401, "empty bearer") rec = self.db.lookup_token_by_bearer(bearer) if rec is None: # Note: do NOT echo the bearer back. Generic message. raise HTTPException(403, "unknown or revoked token") # Per-token IP allowlist takes precedence over global LAN if set. if rec["ip_cidrs"]: ip = _client_ip(request) if not _ip_in_any(ip, rec["ip_cidrs"]): raise HTTPException(403, f"ip not in app allowlist: {ip}") return AppToken(name=rec["name"], is_admin=rec["is_admin"], ip_cidrs=rec["ip_cidrs"]) def require_admin(self, request: Request, authorization: str | None) -> AppToken: tok = self.require_app(request, authorization) if not tok.is_admin: raise HTTPException(403, "admin auth failed") return tok