clawdforge/clawdforge/auth.py
Kayos 44a8fe743f v0.1 — clawdforge service scaffold
LAN-only HTTP service that runs claude -p subprocess on behalf of Sulkta apps.
Bearer token + IP allowlist gated. SQLite-backed token registry + run audit log.

- POST /run               run a prompt, return parsed result
- POST /files             upload a file, get a file_token to attach to /run
- POST /admin/tokens      mint per-app tokens (admin-bootstrap-token gated)
- GET  /admin/tokens      list, DELETE /admin/tokens/<name>  revoke
- GET  /healthz           liveness + claude --version smoke

Container = node:22 + npm-installed @anthropic-ai/claude-code + uvicorn/FastAPI
wrapper. Persistent volumes for /data (sqlite + run staging) and /root/.claude
(subscription auth — survives container rebuilds; auth via 'docker exec -it
clawdforge claude /login' once). Compose binds 192.168.0.5:8800 only — no
public proxy.

First consumer = cauldron (about to land).
2026-04-28 16:46:44 -07:00

71 lines
2.4 KiB
Python

"""Bearer token + IP allowlist enforcement."""
import ipaddress
from fastapi import Header, HTTPException, Request
from .store import Store
def _client_ip(request: Request) -> str:
return request.client.host if request.client else "0.0.0.0"
def ip_in_any(ip_str: str, cidrs: list[str]) -> bool:
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
return False
if ip.is_loopback:
return True
for cidr in cidrs:
try:
if ip in ipaddress.ip_network(cidr, strict=False):
return True
except ValueError:
continue
return False
class Auth:
"""Constructed once at app startup, holds config + store ref."""
def __init__(self, *, store: Store, global_cidrs: list[str], admin_token: str):
self.store = store
self.global_cidrs = global_cidrs
self.admin_token = admin_token
def require_global_ip(self, request: Request) -> None:
ip = _client_ip(request)
if not ip_in_any(ip, self.global_cidrs):
raise HTTPException(403, f"ip not in allowlist: {ip}")
def require_admin(self, request: Request, authorization: str | None) -> None:
self.require_global_ip(request)
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(401, "missing bearer")
token = authorization[7:].strip()
if not _const_eq(token, self.admin_token):
raise HTTPException(403, "admin auth failed")
def require_app(self, request: Request, authorization: str | None) -> dict:
"""Returns {'name': ..., 'ip_cidrs': [...]} on success."""
self.require_global_ip(request)
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(401, "missing bearer")
token = authorization[7:].strip()
rec = self.store.lookup_token(token)
if not rec:
raise HTTPException(403, "unknown or disabled token")
if rec["ip_cidrs"]:
ip = _client_ip(request)
if not ip_in_any(ip, rec["ip_cidrs"]):
raise HTTPException(403, f"ip not in app allowlist: {ip}")
return rec
def _const_eq(a: str, b: str) -> bool:
if len(a) != len(b):
return False
diff = 0
for x, y in zip(a.encode(), b.encode()):
diff |= x ^ y
return diff == 0