crafting-table/crafting_table/auth.py
Cobb Hayes b335405c02 Public-flip audit: generalize internal hosts/paths + drop Sulkta-internal refs
URLs, mount paths, and LAN host bindings parameterized via env or relative paths
so the repo stands up from a clean clone anywhere. Drop cross-codebase refs
("mirrors clawdforge's pattern"), Sulkta-Coop client/merchant test fixtures,
and audit-changelog scaffolding from comments. README terser, technical content
preserved.
2026-05-27 11:25:47 -07:00

151 lines
5.4 KiB
Python

"""Bearer + IP allowlist authentication.
- Bearer tokens hashed at rest (SHA-256). No plaintext stored.
- Per-token IP allowlist (CIDR list). NULL means "any RFC1918 + loopback"
via the global LAN allowlist.
- Admin tokens are flagged in the tokens table — server-side admin checks
query `is_admin` rather than comparing to a bootstrap string.
- Loopback always allowed (test client uses 127.0.0.1; FastAPI's
`request.client.host` returns 'testclient' under TestClient and we patch
that in tests).
- Bearer tokens NEVER appear in error messages or log lines.
"""
from __future__ import annotations
import ipaddress
import logging
import secrets
from dataclasses import dataclass
from pathlib import Path
from fastapi import HTTPException, Request
from .db import DB
log = logging.getLogger("crafting_table.auth")
ADMIN_TOKEN_NAME = "admin"
ADMIN_TOKEN_PREFIX = "ct_"
@dataclass
class AppToken:
name: str
is_admin: bool
ip_cidrs: list[str] | None # None = use global LAN allowlist
def _client_ip(request: Request) -> str:
"""Extract the client IP from a request. Tests monkeypatch this."""
return request.client.host if request.client else "0.0.0.0"
def _ip_in_any(ip_str: str, cidrs: list[str]) -> bool:
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
return False
if ip.is_loopback:
return True
for cidr in cidrs:
try:
if ip in ipaddress.ip_network(cidr, strict=False):
return True
except ValueError:
continue
return False
def _const_eq(a: str, b: str) -> bool:
if len(a) != len(b):
return False
diff = 0
for x, y in zip(a.encode(), b.encode()):
diff |= x ^ y
return diff == 0
class Auth:
"""Holds DB ref + global LAN CIDRs. Construct once at startup."""
def __init__(self, *, db: DB, lan_cidrs: list[str] | tuple[str, ...]):
self.db = db
self.lan_cidrs = list(lan_cidrs)
# ---------- bootstrap ---------------------------------------------------
def bootstrap_admin(self, admin_bearer_path: Path) -> str:
"""Mint admin token if none exists, write plaintext bearer to disk
(chmod 600). Subsequent boots reuse the existing token.
Returns the path-side bearer (read from disk) — not necessarily what
we just minted, since another process may have raced us.
"""
admin_bearer_path = Path(admin_bearer_path)
admin_bearer_path.parent.mkdir(parents=True, exist_ok=True)
existing = self.db.get_token(ADMIN_TOKEN_NAME)
if existing is not None and admin_bearer_path.exists():
return admin_bearer_path.read_text(encoding="utf-8").strip()
if existing is not None:
# Token row exists but the file is gone — we cannot recover the
# plaintext (it was hashed at insert). Revoke and re-mint.
log.warning("admin token row exists but bearer file is missing; rotating")
self.db.revoke_token(ADMIN_TOKEN_NAME)
# Renaming the existing row would be cleaner, but revoke + new
# row keeps the audit trail of past admin tokens.
new_name = f"{ADMIN_TOKEN_NAME}-rotated-{int(__import__('time').time())}"
with self.db._conn() as c:
c.execute("UPDATE tokens SET name=? WHERE name=?", (new_name, ADMIN_TOKEN_NAME))
bearer = ADMIN_TOKEN_PREFIX + secrets.token_urlsafe(32)
self.db.insert_token(
name=ADMIN_TOKEN_NAME,
bearer=bearer,
is_admin=True,
ip_cidrs=None,
)
admin_bearer_path.write_text(bearer + "\n", encoding="utf-8")
admin_bearer_path.chmod(0o600)
log.info("admin bearer written to %s (chmod 600)", admin_bearer_path)
return bearer
# ---------- guards ------------------------------------------------------
def require_global_ip(self, request: Request) -> None:
ip = _client_ip(request)
if not _ip_in_any(ip, self.lan_cidrs):
raise HTTPException(403, f"ip not in LAN allowlist: {ip}")
def require_app(self, request: Request, authorization: str | None) -> AppToken:
"""Returns AppToken on success. Raises 401/403 on failure.
We check the global LAN allowlist FIRST (cheap, doesn't touch DB) so
wide-area scanners don't even cause a token lookup.
"""
self.require_global_ip(request)
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(401, "missing bearer")
bearer = authorization[7:].strip()
if not bearer:
raise HTTPException(401, "empty bearer")
rec = self.db.lookup_token_by_bearer(bearer)
if rec is None:
# Note: do NOT echo the bearer back. Generic message.
raise HTTPException(403, "unknown or revoked token")
# Per-token IP allowlist takes precedence over global LAN if set.
if rec["ip_cidrs"]:
ip = _client_ip(request)
if not _ip_in_any(ip, rec["ip_cidrs"]):
raise HTTPException(403, f"ip not in app allowlist: {ip}")
return AppToken(name=rec["name"], is_admin=rec["is_admin"], ip_cidrs=rec["ip_cidrs"])
def require_admin(self, request: Request, authorization: str | None) -> AppToken:
tok = self.require_app(request, authorization)
if not tok.is_admin:
raise HTTPException(403, "admin auth failed")
return tok