"""Authentication helpers for adacam-api. Auth model (rewritten 2026-06-13, estate-audit CRIT fix): * The API token is a high-entropy RANDOM secret persisted on the device at TOKEN_PATH — it is NOT derived from the device serial. The serial is handed out unauthenticated, so deriving the token from it (the old scheme) meant anyone who could reach :5000 could compute the token. That is fixed here. * The token is only obtainable by the app through the one-shot PAIRING WINDOW (see /pair in app.py): the window must be explicitly opened on the device (`adacam-pair`, or install.sh on first provision), /pair returns the token once, then the window closes. A closed window => /pair refuses. * Bearer comparison is constant-time (hmac.compare_digest). """ import os import secrets import hmac from functools import wraps from flask import request, jsonify TOKEN_PATH = '/data/adacam/api_token' PAIRING_FLAG = '/data/adacam/pairing_open' def get_device_serial(): """Get device serial from the install-generated file (non-secret identifier).""" try: return open('/data/adacam/device_serial').read().strip() except Exception: return 'unknown' def get_api_token(): """Return the device's random API token, generating one on first use. Generated lazily so a freshly-provisioned device is self-bootstrapping; the value never depends on the serial. Stored 0600. """ try: tok = open(TOKEN_PATH).read().strip() if tok: return tok except Exception: pass tok = secrets.token_urlsafe(32) os.makedirs(os.path.dirname(TOKEN_PATH), exist_ok=True) fd = os.open(TOKEN_PATH, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) with os.fdopen(fd, 'w') as f: f.write(tok) return tok def pairing_open(): """True if the device is currently accepting a pairing (window flag present).""" return os.path.exists(PAIRING_FLAG) def close_pairing(): """Close the pairing window (one-shot — called after a successful /pair).""" try: os.remove(PAIRING_FLAG) except FileNotFoundError: pass def require_auth(f): """Decorator: require a valid Bearer token, compared in constant time.""" @wraps(f) def decorated(*args, **kwargs): auth = request.headers.get('Authorization', '') token = auth[7:].strip() if auth.startswith('Bearer ') else auth.strip() if not token or not hmac.compare_digest(token, get_api_token()): return jsonify({'error': 'unauthorized'}), 401 return f(*args, **kwargs) return decorated