URLs, mount paths, and LAN host bindings parameterized via env or relative paths
so the repo stands up from a clean clone anywhere. Drop cross-codebase refs
("mirrors clawdforge's pattern"), Sulkta-Coop client/merchant test fixtures,
and audit-changelog scaffolding from comments. README terser, technical content
preserved.
151 lines
5.4 KiB
Python
151 lines
5.4 KiB
Python
"""Bearer + IP allowlist authentication.
|
|
|
|
- Bearer tokens hashed at rest (SHA-256). No plaintext stored.
|
|
- Per-token IP allowlist (CIDR list). NULL means "any RFC1918 + loopback"
|
|
via the global LAN allowlist.
|
|
- Admin tokens are flagged in the tokens table — server-side admin checks
|
|
query `is_admin` rather than comparing to a bootstrap string.
|
|
- Loopback always allowed (test client uses 127.0.0.1; FastAPI's
|
|
`request.client.host` returns 'testclient' under TestClient and we patch
|
|
that in tests).
|
|
- Bearer tokens NEVER appear in error messages or log lines.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
import logging
|
|
import secrets
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from fastapi import HTTPException, Request
|
|
|
|
from .db import DB
|
|
|
|
|
|
log = logging.getLogger("crafting_table.auth")
|
|
|
|
ADMIN_TOKEN_NAME = "admin"
|
|
ADMIN_TOKEN_PREFIX = "ct_"
|
|
|
|
|
|
@dataclass
|
|
class AppToken:
|
|
name: str
|
|
is_admin: bool
|
|
ip_cidrs: list[str] | None # None = use global LAN allowlist
|
|
|
|
|
|
def _client_ip(request: Request) -> str:
|
|
"""Extract the client IP from a request. Tests monkeypatch this."""
|
|
return request.client.host if request.client else "0.0.0.0"
|
|
|
|
|
|
def _ip_in_any(ip_str: str, cidrs: list[str]) -> bool:
|
|
try:
|
|
ip = ipaddress.ip_address(ip_str)
|
|
except ValueError:
|
|
return False
|
|
if ip.is_loopback:
|
|
return True
|
|
for cidr in cidrs:
|
|
try:
|
|
if ip in ipaddress.ip_network(cidr, strict=False):
|
|
return True
|
|
except ValueError:
|
|
continue
|
|
return False
|
|
|
|
|
|
def _const_eq(a: str, b: str) -> bool:
|
|
if len(a) != len(b):
|
|
return False
|
|
diff = 0
|
|
for x, y in zip(a.encode(), b.encode()):
|
|
diff |= x ^ y
|
|
return diff == 0
|
|
|
|
|
|
class Auth:
|
|
"""Holds DB ref + global LAN CIDRs. Construct once at startup."""
|
|
|
|
def __init__(self, *, db: DB, lan_cidrs: list[str] | tuple[str, ...]):
|
|
self.db = db
|
|
self.lan_cidrs = list(lan_cidrs)
|
|
|
|
# ---------- bootstrap ---------------------------------------------------
|
|
|
|
def bootstrap_admin(self, admin_bearer_path: Path) -> str:
|
|
"""Mint admin token if none exists, write plaintext bearer to disk
|
|
(chmod 600). Subsequent boots reuse the existing token.
|
|
|
|
Returns the path-side bearer (read from disk) — not necessarily what
|
|
we just minted, since another process may have raced us.
|
|
"""
|
|
admin_bearer_path = Path(admin_bearer_path)
|
|
admin_bearer_path.parent.mkdir(parents=True, exist_ok=True)
|
|
existing = self.db.get_token(ADMIN_TOKEN_NAME)
|
|
|
|
if existing is not None and admin_bearer_path.exists():
|
|
return admin_bearer_path.read_text(encoding="utf-8").strip()
|
|
|
|
if existing is not None:
|
|
# Token row exists but the file is gone — we cannot recover the
|
|
# plaintext (it was hashed at insert). Revoke and re-mint.
|
|
log.warning("admin token row exists but bearer file is missing; rotating")
|
|
self.db.revoke_token(ADMIN_TOKEN_NAME)
|
|
# Renaming the existing row would be cleaner, but revoke + new
|
|
# row keeps the audit trail of past admin tokens.
|
|
new_name = f"{ADMIN_TOKEN_NAME}-rotated-{int(__import__('time').time())}"
|
|
with self.db._conn() as c:
|
|
c.execute("UPDATE tokens SET name=? WHERE name=?", (new_name, ADMIN_TOKEN_NAME))
|
|
|
|
bearer = ADMIN_TOKEN_PREFIX + secrets.token_urlsafe(32)
|
|
self.db.insert_token(
|
|
name=ADMIN_TOKEN_NAME,
|
|
bearer=bearer,
|
|
is_admin=True,
|
|
ip_cidrs=None,
|
|
)
|
|
admin_bearer_path.write_text(bearer + "\n", encoding="utf-8")
|
|
admin_bearer_path.chmod(0o600)
|
|
log.info("admin bearer written to %s (chmod 600)", admin_bearer_path)
|
|
return bearer
|
|
|
|
# ---------- guards ------------------------------------------------------
|
|
|
|
def require_global_ip(self, request: Request) -> None:
|
|
ip = _client_ip(request)
|
|
if not _ip_in_any(ip, self.lan_cidrs):
|
|
raise HTTPException(403, f"ip not in LAN allowlist: {ip}")
|
|
|
|
def require_app(self, request: Request, authorization: str | None) -> AppToken:
|
|
"""Returns AppToken on success. Raises 401/403 on failure.
|
|
|
|
We check the global LAN allowlist FIRST (cheap, doesn't touch DB) so
|
|
wide-area scanners don't even cause a token lookup.
|
|
"""
|
|
self.require_global_ip(request)
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
raise HTTPException(401, "missing bearer")
|
|
bearer = authorization[7:].strip()
|
|
if not bearer:
|
|
raise HTTPException(401, "empty bearer")
|
|
rec = self.db.lookup_token_by_bearer(bearer)
|
|
if rec is None:
|
|
# Note: do NOT echo the bearer back. Generic message.
|
|
raise HTTPException(403, "unknown or revoked token")
|
|
|
|
# Per-token IP allowlist takes precedence over global LAN if set.
|
|
if rec["ip_cidrs"]:
|
|
ip = _client_ip(request)
|
|
if not _ip_in_any(ip, rec["ip_cidrs"]):
|
|
raise HTTPException(403, f"ip not in app allowlist: {ip}")
|
|
|
|
return AppToken(name=rec["name"], is_admin=rec["is_admin"], ip_cidrs=rec["ip_cidrs"])
|
|
|
|
def require_admin(self, request: Request, authorization: str | None) -> AppToken:
|
|
tok = self.require_app(request, authorization)
|
|
if not tok.is_admin:
|
|
raise HTTPException(403, "admin auth failed")
|
|
return tok
|