All checks were successful
gitleaks / scan (push) Successful in 35s
The bearer token was sha256(serial)[:32] and the serial is served unauthenticated, so anyone reaching :5000 could compute it and take the device over. Now: token is a random secrets.token_urlsafe(32) at /data/adacam/api_token (never derived from serial); /pair only returns it during a one-shot pairing window (/data/adacam/pairing_open, opened by adacam-pair or install.sh, closes after one pair); require_auth uses hmac.compare_digest. NEEDS ON-DEVICE PAIRING TEST before merge to main — see SECURITY-PAIRING.md.
74 lines
2.6 KiB
Python
74 lines
2.6 KiB
Python
"""Authentication helpers for adacam-api.
|
|
|
|
Auth model (rewritten 2026-06-13, estate-audit CRIT fix):
|
|
* The API token is a high-entropy RANDOM secret persisted on the device at
|
|
TOKEN_PATH — it is NOT derived from the device serial. The serial is handed
|
|
out unauthenticated, so deriving the token from it (the old scheme) meant
|
|
anyone who could reach :5000 could compute the token. That is fixed here.
|
|
* The token is only obtainable by the app through the one-shot PAIRING WINDOW
|
|
(see /pair in app.py): the window must be explicitly opened on the device
|
|
(`adacam-pair`, or install.sh on first provision), /pair returns the token
|
|
once, then the window closes. A closed window => /pair refuses.
|
|
* Bearer comparison is constant-time (hmac.compare_digest).
|
|
"""
|
|
import os
|
|
import secrets
|
|
import hmac
|
|
from functools import wraps
|
|
from flask import request, jsonify
|
|
|
|
TOKEN_PATH = '/data/adacam/api_token'
|
|
PAIRING_FLAG = '/data/adacam/pairing_open'
|
|
|
|
|
|
def get_device_serial():
|
|
"""Get device serial from the install-generated file (non-secret identifier)."""
|
|
try:
|
|
return open('/data/adacam/device_serial').read().strip()
|
|
except Exception:
|
|
return 'unknown'
|
|
|
|
|
|
def get_api_token():
|
|
"""Return the device's random API token, generating one on first use.
|
|
|
|
Generated lazily so a freshly-provisioned device is self-bootstrapping; the
|
|
value never depends on the serial. Stored 0600.
|
|
"""
|
|
try:
|
|
tok = open(TOKEN_PATH).read().strip()
|
|
if tok:
|
|
return tok
|
|
except Exception:
|
|
pass
|
|
tok = secrets.token_urlsafe(32)
|
|
os.makedirs(os.path.dirname(TOKEN_PATH), exist_ok=True)
|
|
fd = os.open(TOKEN_PATH, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
|
|
with os.fdopen(fd, 'w') as f:
|
|
f.write(tok)
|
|
return tok
|
|
|
|
|
|
def pairing_open():
|
|
"""True if the device is currently accepting a pairing (window flag present)."""
|
|
return os.path.exists(PAIRING_FLAG)
|
|
|
|
|
|
def close_pairing():
|
|
"""Close the pairing window (one-shot — called after a successful /pair)."""
|
|
try:
|
|
os.remove(PAIRING_FLAG)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def require_auth(f):
|
|
"""Decorator: require a valid Bearer token, compared in constant time."""
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth = request.headers.get('Authorization', '')
|
|
token = auth[7:].strip() if auth.startswith('Bearer ') else auth.strip()
|
|
if not token or not hmac.compare_digest(token, get_api_token()):
|
|
return jsonify({'error': 'unauthorized'}), 401
|
|
return f(*args, **kwargs)
|
|
return decorated
|