adacam/services/adacam-odc/adacam_odc.py

2040 lines
70 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
adacam_odc.py — ADAcam Combined Service (ODC API + ADAMaps Forwarder)
A minimal Python replacement for Hivemapper's odc-api Node.js service,
combined with the ADAMaps detection forwarder in a single process.
Architecture:
- Thread 1 (Flask API): HTTP server on port 5000 — serves landmarks, GPS, health
- Thread 2 (Forwarder): Reads SQLite, batches detections, sends to ADAMaps API
NO Hivemapper phone-home. NO telemetry. NO framekm upload.
Paths:
- Config: /data/adacam/config.json
- State: /data/adacam/forwarder_state.json
- DB: /data/recording/odc-api.db
- Images: /data/recording/cached_observations/
"""
import hashlib
import json
import logging
import mimetypes
import os
import signal
import sqlite3
import subprocess
import sys
import threading
import time
import traceback
from datetime import datetime
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
import redis
from flask import Flask, Response, jsonify, request, send_file
from PIL import Image
# =============================================================================
# Authentication Helpers (from adacam-api)
# =============================================================================
def get_device_serial() -> str:
"""Get device serial from liberate.sh-generated file or device_info."""
# Try dedicated serial file first
try:
return open('/data/adacam/device_serial').read().strip()
except:
pass
# Fall back to device_info
if device_info.get("serial"):
return device_info["serial"]
return 'unknown'
def get_api_token() -> str:
"""Derive API token from device serial (SHA256-based, matches liberate.sh output)."""
serial = get_device_serial()
return hashlib.sha256(f"adacam-api-{serial}-token".encode()).hexdigest()[:32]
def require_auth(f):
"""Decorator: require valid Bearer token for protected endpoints."""
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
auth = request.headers.get('Authorization', '')
token = auth.replace('Bearer ', '').strip()
if token != get_api_token():
return json_response({'error': 'unauthorized'}, status=401)
return f(*args, **kwargs)
return decorated
# =============================================================================
# Configuration & Constants
# =============================================================================
VERSION = "adacam-odc-2.0.0"
PORT = int(os.environ.get("PORT", 5000))
# Paths (Bee device layout)
ODC_DB_PATH = Path(os.environ.get("ODC_DB_PATH", "/data/recording/odc-api.db"))
# Use glob to handle versioned filenames (e.g. fusion-v3-0-0.db, sensors-v3-0-0.db)
def _find_versioned_db(pattern, fallback):
import glob
candidates = sorted(glob.glob(pattern))
return Path(candidates[-1]) if candidates else Path(fallback)
SENSORS_DB_PATH = _find_versioned_db(
"/data/recording/redis_handler/sensors-v*.db",
os.environ.get("SENSORS_DB_PATH", "/data/recording/redis_handler/sensors-v3-0-0.db")
)
FUSION_DB_PATH = _find_versioned_db(
"/data/recording/redis_handler/fusion-v*.db",
os.environ.get("FUSION_DB_PATH", "/data/recording/redis_handler/fusion-v3-0-0.db")
)
IMAGES_DIR = Path(os.environ.get("IMAGES_DIR", "/data/recording/cached_observations"))
DASHCAM_CONFIG_PATH = Path(os.environ.get("DASHCAM_CONFIG_PATH", "/opt/dashcam/bin/config.json"))
FRAMEKM_DIR = Path("/data/recording/framekm")
# Preview constants
IMAGER_CONFIG_PATH = Path("/data/adacam/camera.config")
FRAMES_ROOT = Path("/tmp/recording/pics")
PREVIEW_STREAM_PORT = 9002
PREVIEW_TIMEOUT_SEC = 120 # auto-stop after 2 minutes (matches original odc-api)
# ADAcam config paths
ADACAM_CONFIG_PATH = Path(os.environ.get("ADACAM_CONFIG_PATH", "/data/adacam/config.json"))
ADACAM_STATE_PATH = Path(os.environ.get("ADACAM_STATE_PATH", "/data/adacam/forwarder_state.json"))
# Redis
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
# Limits
GLOBAL_LIMIT = 5000
CLEANUP_INTERVAL_HOURS = 1
FRAMEKM_MAX_AGE_DAYS = 10 # Delete framekm files older than 10 days
DETECTION_KEEP_DAYS = 7 # Keep forwarded detections in SQLite for 7 days, then purge
DISK_PRESSURE_PCT = 90 # Aggressive cleanup if disk usage exceeds this %
DISK_PRESSURE_BATCH = 50 # Max files to delete per pressure run
# Default ADAcam config
DEFAULT_CONFIG = {
"device_id": "dashcam-unknown",
"adamaps_api": "https://api.adamaps.org",
"adamaps_key": "adamaps-ingest-2026",
"poll_interval": 30,
"batch_size": 100,
"image_batch_size": 10,
"min_confidence": 0.3,
"upload_images": True,
"forwarder_enabled": True,
}
# =============================================================================
# Logging
# =============================================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("adacam-odc")
fwd_logger = logging.getLogger("adacam-forwarder")
# =============================================================================
# Global State (Thread-Safe)
# =============================================================================
# Shutdown coordination
shutdown_event = threading.Event()
start_time = time.time()
# Thread-safe SQLite lock (for writes)
db_lock = threading.Lock()
# Shared configuration (read-only after load, thread-safe)
config: Dict[str, Any] = {}
# Forwarder state
forwarder_state: Dict[str, Any] = {
"last_detection_id": 0,
"last_image_id": 0,
"total_forwarded": 0,
"total_images": 0,
}
forwarder_state_lock = threading.Lock()
# Device info cache
device_info: Dict[str, Any] = {}
# Latest GNSS state (updated by GNSS poller or read from DB)
gnss_state: Dict[str, Any] = {
"latitude": None,
"longitude": None,
"altitude": None,
"speed": None,
"heading": None,
"timestamp": None,
"satellites": None,
"hdop": None,
"eph": None,
}
gnss_state_lock = threading.Lock()
# Preview state
preview_state = {
"active": False,
"started_at": None,
"timer": None,
}
# =============================================================================
# Configuration Management
# =============================================================================
def load_config() -> Dict[str, Any]:
"""Load configuration from file, falling back to defaults."""
global config
config = dict(DEFAULT_CONFIG)
# Ensure config directory exists
ADACAM_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
if ADACAM_CONFIG_PATH.exists():
try:
with open(ADACAM_CONFIG_PATH) as f:
file_config = json.load(f)
config.update(file_config)
logger.info(f"Loaded config from {ADACAM_CONFIG_PATH}")
except Exception as e:
logger.warning(f"Failed to load config: {e}, using defaults")
else:
# Create default config file
try:
with open(ADACAM_CONFIG_PATH, "w") as f:
json.dump(DEFAULT_CONFIG, f, indent=2)
logger.info(f"Created default config at {ADACAM_CONFIG_PATH}")
except Exception as e:
logger.warning(f"Failed to create config file: {e}")
return config
def load_forwarder_state() -> Dict[str, Any]:
"""Load forwarder state from file."""
global forwarder_state
ADACAM_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
if ADACAM_STATE_PATH.exists():
try:
with open(ADACAM_STATE_PATH) as f:
forwarder_state.update(json.load(f))
except Exception:
pass
return forwarder_state
def save_forwarder_state():
"""Save forwarder state to file."""
with forwarder_state_lock:
try:
with open(ADACAM_STATE_PATH, "w") as f:
json.dump(forwarder_state, f, indent=2)
except Exception as e:
fwd_logger.warning(f"Failed to save state: {e}")
# =============================================================================
# Database Helpers (Thread-Safe)
# =============================================================================
def get_db_connection(db_path: Path = ODC_DB_PATH) -> Optional[sqlite3.Connection]:
"""Get a SQLite connection with WAL mode and row factory."""
if not db_path.exists():
return None
try:
conn = sqlite3.connect(str(db_path), timeout=10.0, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL;")
return conn
except sqlite3.Error as e:
logger.error(f"SQLite connection error: {e}")
return None
def query_db(
sql: str,
params: Tuple = (),
db_path: Path = ODC_DB_PATH,
one: bool = False,
) -> Any:
"""Execute a read query and return results as list of dicts."""
try:
conn = get_db_connection(db_path)
if not conn:
return None if one else []
cursor = conn.cursor()
cursor.execute(sql, params)
rows = cursor.fetchall()
conn.close()
if one:
return dict(rows[0]) if rows else None
return [dict(row) for row in rows]
except sqlite3.Error as e:
logger.error(f"SQLite error: {e} | Query: {sql[:100]}")
return None if one else []
# =============================================================================
# Redis Client
# =============================================================================
_redis_client: Optional[redis.Redis] = None
def get_redis() -> Optional[redis.Redis]:
"""Get Redis client (lazy init)."""
global _redis_client
if _redis_client is None:
try:
_redis_client = redis.Redis(
host=REDIS_HOST, port=REDIS_PORT, decode_responses=True
)
_redis_client.ping()
except redis.RedisError:
_redis_client = None
return _redis_client
def redis_get(key: str) -> Optional[str]:
"""Get a Redis key value."""
client = get_redis()
if client:
try:
return client.get(key)
except redis.RedisError:
return None
return None
# =============================================================================
# Device Info
# =============================================================================
def load_device_info() -> Dict[str, Any]:
"""Load device information from config and system."""
global device_info
info = {
"serial": None,
"firmware_version": VERSION,
"api_version": VERSION,
"camera_type": "Bee",
}
# Try dashcam config
if DASHCAM_CONFIG_PATH.exists():
try:
with open(DASHCAM_CONFIG_PATH, "r") as f:
dash_config = json.load(f)
info.update({
"serial": dash_config.get("SERIAL_NUMBER"),
"firmware_version": dash_config.get("FIRMWARE_VERSION", VERSION),
})
except Exception:
pass
# Try system serial
if not info["serial"]:
try:
import subprocess
result = subprocess.run(
["cat", "/sys/firmware/devicetree/base/serial-number"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info["serial"] = result.stdout.strip().replace("\x00", "")
except Exception:
pass
# Use config device_id as fallback
if not info["serial"] and config.get("device_id"):
info["serial"] = config["device_id"]
device_info = info
return info
# =============================================================================
# GNSS Helpers
# =============================================================================
def fetch_latest_gnss() -> Optional[Dict[str, Any]]:
"""Fetch latest GNSS record from fusion/sensors database."""
for db_path in [FUSION_DB_PATH, SENSORS_DB_PATH]:
if not db_path.exists():
continue
result = query_db(
"""
SELECT
utc_time, latitude, longitude, altitude,
speed as estimated_speed, heading,
hdop, gdop, pdop, eph, satellites_used
FROM gnss
ORDER BY system_time DESC
LIMIT 1
""",
db_path=db_path,
one=True,
)
if result:
# Update shared GNSS state
with gnss_state_lock:
gnss_state["latitude"] = result.get("latitude")
gnss_state["longitude"] = result.get("longitude")
gnss_state["altitude"] = result.get("altitude")
gnss_state["heading"] = result.get("heading")
gnss_state["speed"] = result.get("estimated_speed")
gnss_state["timestamp"] = result.get("utc_time")
return result
return None
def fetch_gnss_history(limit: int = 100) -> List[Dict[str, Any]]:
"""Fetch recent GNSS history."""
for db_path in [FUSION_DB_PATH, SENSORS_DB_PATH]:
if not db_path.exists():
continue
result = query_db(
f"""
SELECT
utc_time as unix_timestamp, latitude, longitude, altitude,
speed as estimated_speed, heading, hdop, eph
FROM gnss
ORDER BY system_time DESC
LIMIT {limit}
""",
db_path=db_path,
)
if result:
return result
return []
# =============================================================================
# Landmark Queries
# =============================================================================
def fetch_landmarks_from_id(start_id: int, limit: int = GLOBAL_LIMIT) -> List[Dict[str, Any]]:
"""Fetch landmarks starting from ID (cursor-based pagination)."""
return query_db(
f"""
SELECT
id, ts, map_feature_id, framekm_id, image_name, image_id,
class_id, class_label, speed_label, speed_label_conf,
distance, x_center, y_center, lat, lon, alt, azimuth,
width, height, pitch, roll, yaw, confidence,
x1, y1, x2, y2, cam_lat, cam_lon, cam_heading,
track_id, attributes, model_id, model_hash
FROM landmarks
WHERE id >= ?
ORDER BY id ASC
LIMIT {limit}
""",
(start_id,),
)
def fetch_landmarks_with_range(
since: Optional[int] = None,
until: Optional[int] = None,
limit: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Fetch landmarks with optional time range filter."""
conditions = []
params = []
if since is not None:
conditions.append("ts >= ?")
params.append(since)
if until is not None:
conditions.append("ts <= ?")
params.append(until)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
limit_clause = f"LIMIT {limit}" if limit else ""
return query_db(
f"""
SELECT
id, ts, map_feature_id, framekm_id, image_name, image_id,
class_id, class_label, speed_label, speed_label_conf,
distance, x_center, y_center, lat, lon, alt, azimuth,
width, height, pitch, roll, yaw, confidence,
x1, y1, x2, y2, cam_lat, cam_lon, cam_heading,
track_id, attributes, model_id, model_hash
FROM landmarks
{where_clause}
ORDER BY id DESC
{limit_clause}
""",
tuple(params),
)
def fetch_last_n_landmarks(n: int) -> List[Dict[str, Any]]:
"""Fetch last N landmarks."""
return query_db(
f"""
SELECT
id, ts, map_feature_id, framekm_id, image_name, image_id,
class_id, class_label, speed_label, speed_label_conf,
distance, x_center, y_center, lat, lon, alt, azimuth,
width, height, pitch, roll, yaw, confidence,
x1, y1, x2, y2, cam_lat, cam_lon, cam_heading,
track_id, attributes, model_id, model_hash
FROM landmarks
ORDER BY id DESC
LIMIT {n}
""",
)
def fetch_latest_landmark() -> Optional[Dict[str, Any]]:
"""Fetch most recent landmark."""
return query_db(
"""
SELECT
id, ts, map_feature_id, framekm_id, image_name, image_id,
class_id, class_label, speed_label, speed_label_conf,
distance, x_center, y_center, lat, lon, alt, azimuth,
width, height, pitch, roll, yaw, confidence,
x1, y1, x2, y2, cam_lat, cam_lon, cam_heading,
track_id, attributes, model_id, model_hash
FROM landmarks
ORDER BY id DESC
LIMIT 1
""",
one=True,
)
def fetch_landmark_by_id(landmark_id: int) -> Optional[Dict[str, Any]]:
"""Fetch a specific landmark by ID."""
return query_db(
"SELECT * FROM landmarks WHERE id = ?",
(landmark_id,),
one=True,
)
def fetch_observations_by_landmark(map_feature_id: int) -> List[Dict[str, Any]]:
"""Fetch observations for a landmark/map_feature."""
return query_db(
"""
SELECT
id, ts, image_name, x1, y1, x2, y2,
confidence, cam_lat, cam_lon, cam_heading
FROM landmarks
WHERE map_feature_id = ?
ORDER BY ts ASC
""",
(map_feature_id,),
)
# =============================================================================
# Image Chip Handler
# =============================================================================
def crop_image_chip(image_path: str, x1: int, y1: int, x2: int, y2: int) -> bytes:
"""Crop an image to bounding box and return JPEG bytes."""
img = Image.open(image_path)
left = max(0, int(x1))
top = max(0, int(y1))
right = min(img.width, int(x2))
bottom = min(img.height, int(y2))
if right <= left:
right = left + 1
if bottom <= top:
bottom = top + 1
cropped = img.crop((left, top, right, bottom))
buffer = BytesIO()
cropped.save(buffer, format="JPEG", quality=85)
buffer.seek(0)
return buffer.getvalue()
# =============================================================================
# Forwarder: API Communication
# =============================================================================
def post_detections(detections: List[Dict]) -> Tuple[bool, Optional[str]]:
"""POST detections to ADAMaps API."""
url = f"{config['adamaps_api'].rstrip('/')}/api/ingest"
payload = {"device_id": config["device_id"], "detections": detections}
headers = {
"X-AdaMaps-Key": config["adamaps_key"],
"Content-Type": "application/json",
}
if HAS_REQUESTS:
try:
r = requests.post(url, json=payload, headers=headers, timeout=30)
if r.status_code == 200:
return True, None
elif r.status_code == 429:
return False, "rate_limited"
return False, f"HTTP {r.status_code}: {r.text[:200]}"
except requests.Timeout:
return False, "timeout"
except requests.RequestException as e:
return False, str(e)
else:
try:
data = json.dumps(payload).encode("utf-8")
req = Request(url, data=data, headers=headers, method="POST")
with urlopen(req, timeout=30) as resp:
return (True, None) if resp.status == 200 else (False, f"HTTP {resp.status}")
except HTTPError as e:
return (False, "rate_limited") if e.code == 429 else (False, f"HTTP {e.code}")
except URLError as e:
return False, str(e.reason)
except Exception as e:
return False, str(e)
def upload_image(detection_id: int, image_path: str) -> Tuple[bool, Optional[str]]:
"""Upload image to ADAMaps API."""
url = f"{config['adamaps_api'].rstrip('/')}/api/images"
device_id = config["device_id"]
if not os.path.exists(image_path):
return False, "file_not_found"
content_type = mimetypes.guess_type(image_path)[0] or "image/jpeg"
if HAS_REQUESTS:
try:
with open(image_path, "rb") as f:
r = requests.post(
url,
files={"image": (os.path.basename(image_path), f, content_type)},
data={"detection_id": detection_id, "device_id": device_id},
headers={"X-AdaMaps-Key": config["adamaps_key"]},
timeout=60,
)
if r.status_code == 200:
return True, None
elif r.status_code == 429:
return False, "rate_limited"
return False, f"HTTP {r.status_code}"
except Exception as e:
return False, str(e)
else:
try:
boundary = "Boundary" + hashlib.md5(str(time.time()).encode()).hexdigest()[:16]
with open(image_path, "rb") as f:
image_data = f.read()
body_parts = [
f"--{boundary}\r\nContent-Disposition: form-data; name=\"detection_id\"\r\n\r\n{detection_id}".encode(),
f"--{boundary}\r\nContent-Disposition: form-data; name=\"device_id\"\r\n\r\n{device_id}".encode(),
(
f"--{boundary}\r\nContent-Disposition: form-data; name=\"image\"; "
f'filename="{os.path.basename(image_path)}"\r\nContent-Type: {content_type}\r\n\r\n'
).encode() + image_data,
f"--{boundary}--\r\n".encode(),
]
body_bytes = b"\r\n".join(body_parts)
req = Request(
url,
data=body_bytes,
headers={
"X-AdaMaps-Key": config["adamaps_key"],
"Content-Type": f"multipart/form-data; boundary={boundary}",
},
method="POST",
)
with urlopen(req, timeout=60) as resp:
return (True, None) if resp.status == 200 else (False, f"HTTP {resp.status}")
except HTTPError as e:
return False, f"HTTP {e.code}"
except Exception as e:
return False, str(e)
# =============================================================================
# Forwarder: Detection Queries
# =============================================================================
def fetch_new_detections_for_forwarding(
last_id: int, min_conf: float, limit: int
) -> List[Dict[str, Any]]:
"""Fetch new detections for forwarding to ADAMaps."""
# Note: odc-api uses 'negative_class' column but our schema may not have it
# Fall back gracefully
try:
return query_db(
"""
SELECT id, ts, image_name, class_label, confidence, lat, lon,
cam_lat, cam_lon, cam_heading,
x1, y1, x2, y2,
attributes, azimuth, map_feature_id,
width, height, speed_label, speed_label_conf
FROM landmarks
WHERE id > ?
AND confidence >= ?
AND lat IS NOT NULL
AND lon IS NOT NULL
AND lat != 0
AND lon != 0
ORDER BY id ASC
LIMIT ?
""",
(last_id, min_conf, limit),
)
except Exception as e:
fwd_logger.warning(f"Detection query failed: {e}")
return []
def fetch_pending_images_for_upload(
last_image_id: int, max_det_id: int, min_conf: float, limit: int
) -> List[Dict[str, Any]]:
"""Fetch detections with images that haven't been uploaded yet."""
try:
return query_db(
"""
SELECT id, image_name
FROM landmarks
WHERE id > ?
AND id <= ?
AND confidence >= ?
AND image_name IS NOT NULL
AND image_name != ''
ORDER BY id ASC
LIMIT ?
""",
(last_image_id, max_det_id, min_conf, limit),
)
except Exception as e:
fwd_logger.warning(f"Image fetch query failed: {e}")
return []
# =============================================================================
# Forwarder: Processing Cycle
# =============================================================================
def purge_forwarded_detections():
"""
Delete old forwarded detections from the Bee's SQLite DB.
Only purges rows that:
1. Have already been forwarded (id <= last_detection_id cursor)
2. Are older than DETECTION_KEEP_DAYS
Keeps recent data intact. Safe to call after each successful upload cycle.
"""
keep_ms = DETECTION_KEEP_DAYS * 86400 * 1000
cutoff_ms = int(time.time() * 1000) - keep_ms
with forwarder_state_lock:
cursor_id = forwarder_state.get("last_detection_id", 0)
if cursor_id == 0:
return 0
try:
db_path = ODC_DB_PATH
if not db_path.exists():
return 0
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(
"DELETE FROM landmarks WHERE id <= ? AND ts < ?",
(cursor_id, cutoff_ms)
)
deleted = cur.rowcount
conn.commit()
conn.close()
if deleted > 0:
fwd_logger.info(f"[purge] Deleted {deleted} forwarded detections older than {DETECTION_KEEP_DAYS}d from SQLite")
return deleted
except Exception as e:
fwd_logger.warning(f"[purge] Detection purge failed: {e}")
return 0
def purge_uploaded_images():
"""
Delete image files from IMAGES_DIR that have already been uploaded to ADAMaps.
Only deletes files:
1. Whose detection id <= last_image_id cursor (already uploaded)
2. Older than DETECTION_KEEP_DAYS
Gets the list of uploaded image filenames from SQLite, then removes the files.
"""
keep_ms = DETECTION_KEEP_DAYS * 86400 * 1000
cutoff_ms = int(time.time() * 1000) - keep_ms
with forwarder_state_lock:
cursor_img_id = forwarder_state.get("last_image_id", 0)
if cursor_img_id == 0:
return 0
deleted = 0
try:
db_path = ODC_DB_PATH
if not db_path.exists():
return 0
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(
"SELECT image_name FROM landmarks WHERE id <= ? AND ts < ? AND image_name IS NOT NULL",
(cursor_img_id, cutoff_ms)
)
rows = cur.fetchall()
conn.close()
for row in rows:
img_path = IMAGES_DIR / row["image_name"]
try:
if img_path.exists():
img_path.unlink()
deleted += 1
except Exception as e:
fwd_logger.debug(f"[purge] Could not delete image {img_path}: {e}")
if deleted > 0:
fwd_logger.info(f"[purge] Deleted {deleted} uploaded images older than {DETECTION_KEEP_DAYS}d from {IMAGES_DIR}")
return deleted
except Exception as e:
fwd_logger.warning(f"[purge] Image purge failed: {e}")
return 0
def forwarder_process_cycle() -> Tuple[int, int]:
"""
One processing cycle: forward new detections + upload images.
Returns (detections_sent, images_uploaded).
"""
min_conf = config.get("min_confidence", 0.3)
batch_size = config.get("batch_size", 100)
img_limit = config.get("image_batch_size", 10)
upload_imgs = config.get("upload_images", True)
with forwarder_state_lock:
last_det_id = forwarder_state.get("last_detection_id", 0)
last_img_id = forwarder_state.get("last_image_id", 0)
det_sent = 0
img_uploaded = 0
# ── 1. Forward detections ─────────────────────────────────────────────────
rows = fetch_new_detections_for_forwarding(last_det_id, min_conf, batch_size * 5)
if rows:
fwd_logger.info(f"Found {len(rows)} new detections (cursor id>{last_det_id})")
for i in range(0, len(rows), batch_size):
if shutdown_event.is_set():
break
batch_rows = rows[i : i + batch_size]
detections = []
for row in batch_rows:
det = {
"id": str(row["id"]),
"ts": int(row["ts"]) if row["ts"] else int(time.time() * 1000),
"lat": float(row["lat"]),
"lon": float(row["lon"]),
"class_label": row["class_label"] or "unknown",
"overall_confidence": float(row["confidence"]) if row["confidence"] else 0.5,
"device_id": config["device_id"],
}
# Add optional fields if present
if row.get("x1") is not None:
det["bbox_x1"] = float(row["x1"])
if row.get("y1") is not None:
det["bbox_y1"] = float(row["y1"])
if row.get("x2") is not None:
det["bbox_x2"] = float(row["x2"])
if row.get("y2") is not None:
det["bbox_y2"] = float(row["y2"])
if row.get("cam_lat") is not None:
det["cam_lat"] = float(row["cam_lat"])
if row.get("cam_lon") is not None:
det["cam_lon"] = float(row["cam_lon"])
if row.get("cam_heading") is not None:
det["cam_heading"] = float(row["cam_heading"])
if row.get("azimuth") is not None:
det["azimuth"] = float(row["azimuth"])
if row.get("map_feature_id") is not None:
det["map_feature_id"] = int(row["map_feature_id"])
if row.get("width") is not None:
det["width"] = float(row["width"])
if row.get("height") is not None:
det["height"] = float(row["height"])
if row.get("speed_label") is not None:
det["speed_label"] = int(row["speed_label"])
if row.get("speed_label_conf") is not None:
det["speed_label_conf"] = float(row["speed_label_conf"])
# Parse and forward attributes JSON if present
if row.get("attributes"):
try:
det["attributes"] = json.loads(row["attributes"])
except Exception:
pass
detections.append(det)
ok, err = post_detections(detections)
if ok:
det_sent += len(batch_rows)
with forwarder_state_lock:
forwarder_state["last_detection_id"] = int(batch_rows[-1]["id"])
fwd_logger.info(
f'Sent batch of {len(batch_rows)} detections '
f'(id {batch_rows[0]["id"]}{batch_rows[-1]["id"]})'
)
else:
fwd_logger.warning(f"Failed to send batch: {err}")
if err == "rate_limited":
time.sleep(10)
break # Stop on failure, retry next cycle
save_forwarder_state()
time.sleep(0.3)
# ── 2. Upload images ──────────────────────────────────────────────────────
if upload_imgs and not shutdown_event.is_set():
with forwarder_state_lock:
current_last_det_id = forwarder_state.get("last_detection_id", 0)
last_img_id = forwarder_state.get("last_image_id", 0)
if last_img_id < current_last_det_id:
img_rows = fetch_pending_images_for_upload(
last_img_id, current_last_det_id, min_conf, img_limit
)
for row in img_rows:
if shutdown_event.is_set():
break
img_path = IMAGES_DIR / row["image_name"]
if not img_path.exists():
# Mark as attempted
with forwarder_state_lock:
forwarder_state["last_image_id"] = max(
forwarder_state.get("last_image_id", 0), int(row["id"])
)
continue
ok, err = upload_image(row["id"], str(img_path))
if ok:
img_uploaded += 1
with forwarder_state_lock:
forwarder_state["last_image_id"] = max(
forwarder_state.get("last_image_id", 0), int(row["id"])
)
fwd_logger.debug(f'Uploaded image for detection {row["id"]}')
elif err == "rate_limited":
fwd_logger.warning("Image upload rate limited, backing off")
time.sleep(5)
break
else:
fwd_logger.warning(f'Image upload failed for {row["id"]}: {err}')
with forwarder_state_lock:
forwarder_state["last_image_id"] = max(
forwarder_state.get("last_image_id", 0), int(row["id"])
)
if img_uploaded > 0:
fwd_logger.info(f"Uploaded {img_uploaded} images")
save_forwarder_state()
# ── 3. Purge old forwarded detections + images ───────────────────────────
if det_sent > 0 or img_uploaded > 0:
purge_forwarded_detections()
purge_uploaded_images()
return det_sent, img_uploaded
# =============================================================================
# Thread 2: Forwarder Worker
# =============================================================================
def is_internet_available(host: str = "8.8.8.8", port: int = 53, timeout: int = 3) -> bool:
"""Quick internet connectivity check — DNS port probe, no HTTP overhead."""
import socket
try:
socket.setdefaulttimeout(timeout)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
return True
except Exception:
return False
def forwarder_thread_main():
"""Main function for forwarder thread."""
fwd_logger.info("╔════════════════════════════════════════════════════════════╗")
fwd_logger.info("║ ADAcam Forwarder Thread Starting ║")
fwd_logger.info("╚════════════════════════════════════════════════════════════╝")
fwd_logger.info(f" Device ID: {config['device_id']}")
fwd_logger.info(f" ADAMaps API: {config['adamaps_api']}")
fwd_logger.info(f" Poll interval: {config['poll_interval']}s")
fwd_logger.info(f" Min confidence: {config['min_confidence']}")
with forwarder_state_lock:
fwd_logger.info(f" Cursor at id: {forwarder_state.get('last_detection_id', 0)}")
fwd_logger.info(f" Total sent: {forwarder_state.get('total_forwarded', 0)} detections")
backoff_seconds = config.get("poll_interval", 30)
max_backoff = 300 # 5 minutes max
while not shutdown_event.is_set():
try:
if not is_internet_available():
fwd_logger.debug("No internet connectivity — skipping cycle, will retry")
for _ in range(min(backoff_seconds, max_backoff)):
if shutdown_event.is_set():
break
time.sleep(1)
backoff_seconds = min(backoff_seconds * 2, max_backoff)
continue
det_sent, img_uploaded = forwarder_process_cycle()
if det_sent > 0 or img_uploaded > 0:
with forwarder_state_lock:
forwarder_state["total_forwarded"] = (
forwarder_state.get("total_forwarded", 0) + det_sent
)
forwarder_state["total_images"] = (
forwarder_state.get("total_images", 0) + img_uploaded
)
save_forwarder_state()
fwd_logger.info(
f"Cycle complete: {det_sent} detections, {img_uploaded} images "
f"(total: {forwarder_state['total_forwarded']} det, "
f"{forwarder_state['total_images']} img)"
)
# Reset backoff on success
backoff_seconds = config.get("poll_interval", 30)
else:
# No data, use normal poll interval
backoff_seconds = config.get("poll_interval", 30)
except Exception as e:
fwd_logger.error(f"Forwarder cycle error: {e}")
fwd_logger.debug(traceback.format_exc())
# Exponential backoff on error
backoff_seconds = min(backoff_seconds * 2, max_backoff)
fwd_logger.info(f"Backing off for {backoff_seconds}s")
# Sleep in chunks for responsive shutdown
for _ in range(int(backoff_seconds)):
if shutdown_event.is_set():
break
time.sleep(1)
fwd_logger.info("Forwarder thread stopped")
def start_forwarder_thread() -> Optional[threading.Thread]:
"""Start the forwarder thread with auto-restart on crash."""
if not config.get("forwarder_enabled", True):
logger.info("Forwarder disabled in config")
return None
def wrapper():
while not shutdown_event.is_set():
try:
forwarder_thread_main()
except Exception as e:
fwd_logger.error(f"Forwarder thread crashed: {e}")
fwd_logger.debug(traceback.format_exc())
if not shutdown_event.is_set():
fwd_logger.info("Restarting forwarder thread in 10s...")
time.sleep(10)
thread = threading.Thread(target=wrapper, name="forwarder", daemon=True)
thread.start()
return thread
# =============================================================================
# Background Workers
# =============================================================================
def get_disk_usage_pct(path="/data"):
"""Return disk usage percentage for the given path."""
try:
st = os.statvfs(path)
total = st.f_blocks * st.f_frsize
free = st.f_bavail * st.f_frsize
if total == 0:
return 0
return int((total - free) / total * 100)
except Exception:
return 0
def cleanup_old_framekms():
"""Delete framekm files older than FRAMEKM_MAX_AGE_DAYS.
Our replacement for hivemapper-folder-purger — pure local disk management, no phone-home."""
if not FRAMEKM_DIR.exists():
return
now = time.time()
max_age_seconds = FRAMEKM_MAX_AGE_DAYS * 86400
deleted_count = 0
for root, dirs, files in os.walk(FRAMEKM_DIR):
for fname in files:
fpath = os.path.join(root, fname)
try:
if now - os.path.getmtime(fpath) > max_age_seconds:
os.remove(fpath)
deleted_count += 1
except OSError:
pass
if deleted_count > 0:
logger.info(f"[cleanup] Age-based: removed {deleted_count} framekm files (>{FRAMEKM_MAX_AGE_DAYS}d old)")
def cleanup_disk_pressure():
"""Emergency cleanup when disk is critically full.
Deletes oldest framekm files first, up to DISK_PRESSURE_BATCH per run."""
if not FRAMEKM_DIR.exists():
return
usage = get_disk_usage_pct("/data")
if usage < DISK_PRESSURE_PCT:
return
logger.warning(f"[cleanup] Disk pressure: {usage}% used — emergency cleanup running")
all_files = []
for root, dirs, files in os.walk(FRAMEKM_DIR):
for fname in files:
fpath = os.path.join(root, fname)
try:
all_files.append((os.path.getmtime(fpath), fpath))
except OSError:
pass
all_files.sort() # oldest first
deleted = 0
for _, fpath in all_files[:DISK_PRESSURE_BATCH]:
try:
os.remove(fpath)
deleted += 1
except OSError:
pass
if deleted:
usage_after = get_disk_usage_pct("/data")
logger.info(f"[cleanup] Pressure cleanup: removed {deleted} files, disk now {usage_after}%")
def cleanup_worker():
"""Background cleanup thread — replaces hivemapper-folder-purger entirely."""
while not shutdown_event.is_set():
try:
cleanup_old_framekms()
cleanup_disk_pressure()
except Exception as e:
logger.error(f"Cleanup error: {e}")
for _ in range(CLEANUP_INTERVAL_HOURS * 3600):
if shutdown_event.is_set():
break
time.sleep(1)
# =============================================================================
# System Initialization
# =============================================================================
def system_init():
"""Run system initialization tasks."""
logger.info("Running system initialization...")
# Set swappiness
try:
import subprocess
subprocess.run(
["sysctl", "-w", "vm.swappiness=10"],
check=False, capture_output=True, timeout=5
)
logger.info("Set vm.swappiness=10")
except Exception as e:
logger.warning(f"Failed to set swappiness: {e}")
# Start IMU calibrator (fire and forget)
imu_calibrator = "/opt/dashcam/bin/imucalibrator"
if os.path.exists(imu_calibrator):
try:
import subprocess
subprocess.Popen(
[imu_calibrator],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
logger.info("Started IMU calibrator")
except Exception as e:
logger.warning(f"Failed to start IMU calibrator: {e}")
# =============================================================================
# Preview Helpers
# =============================================================================
def stop_preview_impl(): # DISABLED
pass
def _stop_preview_impl_orig():
"""Stop preview mode — restore normal camera config and restart camera-bridge."""
preview_state["active"] = False
preview_state["started_at"] = None
if preview_state["timer"]:
preview_state["timer"].cancel()
preview_state["timer"] = None
try:
subprocess.run(["systemctl", "stop", "camera-bridge"], check=False)
time.sleep(1)
subprocess.run(["systemctl", "start", "camera-bridge"], check=False)
except Exception as e:
logger.error(f"[preview] stop error: {e}")
try:
subprocess.run(["rm", "-rf", "/tmp/recording/preview"], check=False)
except Exception:
pass
logger.info("[preview] stopped, camera-bridge restored")
def start_preview_impl(): # DISABLED
pass
def _start_preview_impl_orig():
"""Start preview mode — switch camera to preview config, restart camera-bridge.
Auto-stops after PREVIEW_TIMEOUT_SEC seconds."""
# Cancel any existing timer
if preview_state["timer"]:
preview_state["timer"].cancel()
preview_state["active"] = True
preview_state["started_at"] = time.time()
# Schedule auto-stop
t = threading.Timer(PREVIEW_TIMEOUT_SEC, stop_preview_impl)
t.daemon = True
t.start()
preview_state["timer"] = t
try:
subprocess.run(["mkdir", "-p", "/tmp/recording/preview"], check=False)
except Exception:
pass
try:
subprocess.run(["systemctl", "stop", "camera-bridge"], check=False)
time.sleep(1)
subprocess.run(["systemctl", "start", "camera-bridge"], check=False)
except Exception as e:
logger.error(f"[preview] start error: {e}")
logger.info("[preview] started, auto-stop in %ds", PREVIEW_TIMEOUT_SEC)
# =============================================================================
# Flask Application (Thread 1)
# =============================================================================
app = Flask(__name__)
app.config["JSON_SORT_KEYS"] = False
def json_response(data: Any, status: int = 200) -> Response:
"""Create a JSON response."""
return Response(
json.dumps(data, default=str),
status=status,
mimetype="application/json",
)
# -----------------------------------------------------------------------------
# Health & Info Routes
# -----------------------------------------------------------------------------
@app.route("/health", methods=["GET"])
@app.route("/api/1/health", methods=["GET"])
def health():
"""Health check endpoint."""
return json_response({"status": "ok", "version": VERSION})
@app.route("/ping", methods=["GET"])
@app.route("/api/1/ping", methods=["GET"])
def ping():
"""Ping endpoint with uptime."""
uptime = int(time.time() - start_time)
with forwarder_state_lock:
fwd_total = forwarder_state.get("total_forwarded", 0)
return json_response({
"healthy": True,
"uptime": uptime,
"cameraTime": int(time.time() * 1000),
"forwarder_total": fwd_total,
})
@app.route("/info", methods=["GET"])
@app.route("/api/1/info", methods=["GET"])
def info():
"""Device info endpoint."""
return json_response({
"serial": device_info.get("serial"),
"firmware_version": device_info.get("firmware_version"),
"api_version": VERSION,
"camera_type": device_info.get("camera_type", "Bee"),
"device_id": config.get("device_id"),
})
# -----------------------------------------------------------------------------
# Forwarder Status Route
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Pairing & Device Config Routes (from adacam-api)
# -----------------------------------------------------------------------------
@app.route("/pair", methods=["GET"])
@app.route("/api/1/pair", methods=["GET"])
def pair():
"""Pairing info for Varroa app. Unauthenticated."""
serial = get_device_serial()
return json_response({
'serial': serial,
'version': VERSION,
'ap_ip': '10.77.0.1',
'api_port': PORT
})
@app.route("/api/1/wifi/status", methods=["GET"])
def wifi_status():
"""Get WiFi client interface status (unauthenticated)."""
import subprocess
try:
result = subprocess.run(
['wpa_cli', '-i', 'wlp1s0f1', 'status'],
capture_output=True, text=True, timeout=5
)
lines = dict(
l.split('=', 1) for l in result.stdout.strip().split('\n') if '=' in l
)
return json_response({
'ssid': lines.get('ssid', ''),
'ip': lines.get('ip_address', ''),
'state': lines.get('wpa_state', 'DISCONNECTED'),
'connected': lines.get('wpa_state') == 'COMPLETED'
})
except Exception as e:
return json_response({'error': str(e), 'connected': False})
@app.route("/api/1/wifi/connect", methods=["POST"])
@require_auth
def wifi_connect():
"""Connect to a WiFi network (authenticated)."""
import subprocess
data = request.get_json() or {}
ssid = data.get('ssid', '').strip()
password = data.get('password', '').strip()
if not ssid or not password:
return json_response({'error': 'ssid and password required'}, status=400)
try:
result = subprocess.run(
['/usr/local/bin/adacam-wifi-connect', ssid, password],
capture_output=True, text=True, timeout=15
)
if result.returncode == 0:
return json_response({'ok': True, 'message': f'Connecting to {ssid}'})
else:
return json_response({'error': result.stderr or 'Failed'}, status=500)
except Exception as e:
return json_response({'error': str(e)}, status=500)
@app.route("/api/1/ssh/status", methods=["GET"])
def ssh_status():
"""Get SSH daemon status (unauthenticated)."""
import subprocess
result = subprocess.run(
['systemctl', 'is-active', 'sshd'],
capture_output=True, text=True
)
return json_response({'active': result.stdout.strip() == 'active'})
@app.route("/api/1/ssh/toggle", methods=["POST"])
@require_auth
def ssh_toggle():
"""Enable/disable SSH (authenticated)."""
import subprocess
data = request.get_json() or {}
enable = data.get('enable', True)
subprocess.run(
['systemctl', 'start' if enable else 'stop', 'sshd'],
timeout=5
)
return json_response({'ok': True, 'ssh_enabled': enable})
@app.route("/api/1/config", methods=["GET"])
def get_config():
"""Get current configuration (unauthenticated, sensitive values masked)."""
return json_response({
'device_id': config.get('device_id'),
'adamaps_api': config.get('adamaps_api'),
'poll_interval': config.get('poll_interval'),
'forwarder_enabled': config.get('forwarder_enabled'),
'version': VERSION,
})
@app.route("/api/1/config", methods=["POST"])
@require_auth
def set_config():
"""Update configuration (authenticated)."""
data = request.get_json() or {}
allowed_keys = ['poll_interval', 'batch_size', 'min_confidence', 'upload_images', 'forwarder_enabled']
updated = []
for key in allowed_keys:
if key in data:
config[key] = data[key]
updated.append(key)
if updated:
try:
with open(ADACAM_CONFIG_PATH, "w") as f:
json.dump(config, f, indent=2)
except Exception as e:
return json_response({'error': f'Failed to save config: {e}'}, status=500)
return json_response({'ok': True, 'updated': updated})
@app.route("/forwarder/status", methods=["GET"])
@app.route("/api/1/forwarder/status", methods=["GET"])
def forwarder_status():
"""Forwarder status and statistics."""
with forwarder_state_lock:
state_copy = dict(forwarder_state)
return json_response({
"enabled": config.get("forwarder_enabled", True),
"adamaps_api": config.get("adamaps_api"),
"device_id": config.get("device_id"),
"last_detection_id": state_copy.get("last_detection_id", 0),
"last_image_id": state_copy.get("last_image_id", 0),
"total_forwarded": state_copy.get("total_forwarded", 0),
"total_images": state_copy.get("total_images", 0),
})
# -----------------------------------------------------------------------------
# ML Status Routes
# -----------------------------------------------------------------------------
@app.route("/ml/status", methods=["GET"])
@app.route("/api/1/ml/status", methods=["GET"])
def ml_status():
"""ML pipeline status (checks Redis MAP_AI_READY key)."""
ready = redis_get("MAP_AI_READY")
return json_response({
"ready": ready == "1" or ready == "true" or ready == "True",
"key": "MAP_AI_READY",
"value": ready,
})
# -----------------------------------------------------------------------------
# GPS/Position Routes
# -----------------------------------------------------------------------------
@app.route("/gps", methods=["GET"])
@app.route("/api/1/gps", methods=["GET"])
def gps():
"""Latest GPS fix."""
data = fetch_latest_gnss()
return json_response(data if data else {})
@app.route("/gps/history", methods=["GET"])
@app.route("/api/1/gps/history", methods=["GET"])
def gps_history():
"""Recent GPS track."""
limit = request.args.get("limit", 100, type=int)
data = fetch_gnss_history(limit=min(limit, 1000))
return json_response(data)
@app.route("/position", methods=["GET"])
@app.route("/position/latest", methods=["GET"])
@app.route("/api/1/position", methods=["GET"])
@app.route("/api/1/position/latest", methods=["GET"])
def position():
"""Current position (lat/lon/heading/speed)."""
data = fetch_latest_gnss()
if data:
return json_response({
"latitude": data.get("latitude"),
"longitude": data.get("longitude"),
"altitude": data.get("altitude"),
"heading": data.get("heading"),
"speed": data.get("estimated_speed"),
"timestamp": data.get("utc_time"),
})
return json_response({})
@app.route("/gnssConcise", methods=["GET"])
@app.route("/api/1/gnssConcise", methods=["GET"])
def gnss_concise():
"""Concise GNSS state."""
data = fetch_latest_gnss()
return json_response(data if data else {})
# -----------------------------------------------------------------------------
# Landmarks Routes
# -----------------------------------------------------------------------------
@app.route("/landmarks/", methods=["GET"])
@app.route("/api/1/landmarks/", methods=["GET"])
def landmarks_list():
"""List landmarks with optional time filter."""
since = request.args.get("since", type=int)
until = request.args.get("until", type=int)
limit = GLOBAL_LIMIT if since is None and until is None else None
data = fetch_landmarks_with_range(since=since, until=until, limit=limit)
return json_response(data)
@app.route("/landmarks/id/<int:landmark_id>", methods=["GET"])
@app.route("/api/1/landmarks/id/<int:landmark_id>", methods=["GET"])
def landmarks_from_id(landmark_id: int):
"""Fetch landmarks starting from ID (cursor-based pagination)."""
limit = request.args.get("limit", GLOBAL_LIMIT, type=int)
data = fetch_landmarks_from_id(landmark_id, limit=min(limit, GLOBAL_LIMIT))
return json_response(data)
@app.route("/landmarks/latest", methods=["GET"])
@app.route("/api/1/landmarks/latest", methods=["GET"])
def landmarks_latest():
"""Most recent landmark."""
data = fetch_latest_landmark()
if data:
return json_response({"last_landmark": data})
return json_response({"error": "No landmarks found"}, status=404)
@app.route("/landmarks/last/<int:n>", methods=["GET"])
@app.route("/api/1/landmarks/last/<int:n>", methods=["GET"])
def landmarks_last_n(n: int):
"""Last N landmarks."""
data = fetch_last_n_landmarks(min(n, GLOBAL_LIMIT))
return json_response(data)
@app.route("/landmarks/boundingBox/<int:landmark_id>", methods=["GET"])
@app.route("/api/1/landmarks/boundingBox/<int:landmark_id>", methods=["GET"])
def landmarks_bbox(landmark_id: int):
"""Bounding box coordinates for a landmark."""
data = fetch_landmark_by_id(landmark_id)
if not data:
return json_response({"error": "Landmark not found"}, status=404)
return json_response({
"boundingBox": {
"x1": data.get("x1"),
"y1": data.get("y1"),
"x2": data.get("x2"),
"y2": data.get("y2"),
},
"confidence": data.get("confidence"),
"ts": data.get("ts"),
"lat": data.get("cam_lat"),
"lon": data.get("cam_lon"),
"heading": data.get("cam_heading"),
})
@app.route("/landmarks/<int:landmark_id>/chips", methods=["GET"])
@app.route("/api/1/landmarks/<int:landmark_id>/chips", methods=["GET"])
def landmarks_chips_list(landmark_id: int):
"""List available image chips for a landmark."""
landmark = fetch_landmark_by_id(landmark_id)
if not landmark:
return json_response({"error": "Landmark not found"}, status=404)
map_feature_id = landmark.get("map_feature_id") or landmark_id
observations = fetch_observations_by_landmark(map_feature_id)
if not observations:
observations = [landmark]
chips = []
for obs in observations:
image_name = obs.get("image_name")
if image_name:
image_path = IMAGES_DIR / image_name
if image_path.exists():
chips.append({
"chip_id": obs.get("id"),
"path": f"/landmarks/{landmark_id}/chips/{obs.get('id')}",
"image_name": image_name,
})
if not chips:
return json_response(
{"error": f"No images found for landmark {landmark_id}"}, status=404
)
return json_response(chips[-1:])
@app.route("/landmarks/<int:landmark_id>/chips/<int:chip_id>", methods=["GET"])
@app.route("/api/1/landmarks/<int:landmark_id>/chips/<int:chip_id>", methods=["GET"])
def landmarks_chip_image(landmark_id: int, chip_id: int):
"""Serve a cropped image chip for a landmark observation."""
obs = fetch_landmark_by_id(chip_id)
if not obs:
return Response("Observation not found", status=404)
image_name = obs.get("image_name")
if not image_name:
return Response("No image for observation", status=404)
image_path = IMAGES_DIR / image_name
if not image_path.exists():
return Response("Image file not found", status=404)
x1 = obs.get("x1", 0)
y1 = obs.get("y1", 0)
x2 = obs.get("x2", 100)
y2 = obs.get("y2", 100)
try:
jpeg_bytes = crop_image_chip(str(image_path), x1, y1, x2, y2)
return Response(jpeg_bytes, mimetype="image/jpeg")
except Exception as e:
logger.error(f"Failed to crop image: {e}")
return Response("Failed to process image", status=500)
# =============================================================================
# Preview Routes
# =============================================================================
@app.route('/preview/start')
@app.route('/api/1/preview/start')
def preview_start():
start_preview_impl()
return jsonify({"status": "started", "stream": f"http://{request.host.split(':')[0]}:{PREVIEW_STREAM_PORT}/?action=stream"})
@app.route('/preview/stop')
@app.route('/api/1/preview/stop')
def preview_stop():
stop_preview_impl()
return jsonify({"status": "stopped"})
@app.route('/preview/status')
@app.route('/api/1/preview/status')
def preview_status():
elapsed = None
remaining = None
if preview_state["active"] and preview_state["started_at"]:
elapsed = int(time.time() - preview_state["started_at"])
remaining = max(0, PREVIEW_TIMEOUT_SEC - elapsed)
return jsonify({
"status": "started" if preview_state["active"] else "stopped",
"elapsed_seconds": elapsed,
"remaining_seconds": remaining,
"stream_url": f"http://{request.host.split(':')[0]}:{PREVIEW_STREAM_PORT}/?action=stream" if preview_state["active"] else None,
})
@app.route('/preview/metadata')
@app.route('/api/1/preview/metadata')
def preview_metadata():
try:
result = subprocess.run(
["sh", "-c", f"ls -t {FRAMES_ROOT}/*.jpg 2>/dev/null | head -2"],
capture_output=True, text=True, timeout=5
)
names = [n.strip() for n in result.stdout.strip().split('\n') if n.strip()]
if not names:
return jsonify({"error": "no frames available"}), 404
# Skip the pipe frame if it's first
filename = names[0]
if Path(filename).name == "cam0pipe.jpg" and len(names) > 1:
filename = names[1]
p = Path(filename)
stat = p.stat()
return jsonify({
"path": str(p),
"name": p.name,
"date": int(stat.st_mtime * 1000), # milliseconds like original
"size": stat.st_size,
})
except Exception as e:
return jsonify({"error": str(e)}), 500
# -----------------------------------------------------------------------------
# Error Handlers
# -----------------------------------------------------------------------------
@app.errorhandler(404)
def not_found(e):
return json_response({"error": "Not found"}, status=404)
@app.errorhandler(500)
def server_error(e):
return json_response({"error": "Internal server error"}, status=500)
# =============================================================================
# Signal Handlers
# =============================================================================
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully."""
logger.info(f"Received signal {signum}, shutting down...")
shutdown_event.set()
# Give threads time to clean up
time.sleep(2)
sys.exit(0)
# =============================================================================
# Main Entry Point
# =============================================================================
# ── USB Tethering Auto-Connect ────────────────────────────────────────────────
USB_TETHER_IFACE = os.environ.get("USB_TETHER_IFACE", "usb2")
USB_TETHER_POLL_SEC = 3
_usb_tether_state = {
"active": False,
"udhcpc_pid": None,
}
_usb_tether_lock = threading.Lock()
def _read_operstate(iface: str) -> str:
"""Read /sys/class/net/<iface>/operstate — returns 'up', 'down', or 'unknown'."""
try:
path = f"/sys/class/net/{iface}/operstate"
with open(path) as f:
return f.read().strip()
except Exception:
return "missing"
def _is_tether_iface(iface: str) -> bool:
"""Check that interface is a USB network tether (rndis/cdc_ether), not USB storage or other."""
TETHER_DRIVERS = {"rndis_host", "cdc_ether", "cdc_ncm", "cdc_mbim", "r8152", "ax88179_178a"}
try:
uevent_path = f"/sys/class/net/{iface}/device/uevent"
with open(uevent_path) as f:
uevent = f.read()
driver_line = [l for l in uevent.splitlines() if l.startswith("DRIVER=")]
if driver_line:
driver = driver_line[0].split("=", 1)[1].strip()
if driver in TETHER_DRIVERS:
logger.info(f"[usb-tether] {iface} driver={driver} — confirmed tether")
return True
else:
logger.info(f"[usb-tether] {iface} driver={driver} — not a tether device, skipping")
return False
except FileNotFoundError:
# No uevent — check modalias as fallback
try:
modalias_path = f"/sys/class/net/{iface}/device/modalias"
with open(modalias_path) as f:
modalias = f.read().strip()
if any(d in modalias for d in ("rndis", "cdc_ether", "cdc_ncm")):
return True
except Exception:
pass
logger.warning(f"[usb-tether] {iface} — cannot determine driver, skipping for safety")
return False
except Exception as e:
logger.warning(f"[usb-tether] Driver check failed: {e} — skipping")
return False
def _start_usb_tether():
"""Bring up USB tethering interface and get DHCP lease."""
iface = USB_TETHER_IFACE
# Safety check — only proceed if this is actually a tethering device
if not _is_tether_iface(iface):
return
logger.info(f"[usb-tether] {iface} carrier detected — running DHCP")
try:
# Run udhcpc in background, store PID for cleanup
proc = subprocess.Popen(
["udhcpc", "-i", iface, "-q", "-n"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
proc.wait(timeout=15)
with _usb_tether_lock:
_usb_tether_state["active"] = True
_usb_tether_state["udhcpc_pid"] = None # udhcpc -q exits after lease
# Verify we got an IP
result = subprocess.run(
["ip", "addr", "show", iface],
capture_output=True, text=True, timeout=5
)
if "inet " in result.stdout:
ip_line = [l.strip() for l in result.stdout.splitlines() if "inet " in l]
logger.info(f"[usb-tether] Online via {iface}: {ip_line[0] if ip_line else 'IP unknown'}")
else:
logger.warning(f"[usb-tether] udhcpc ran but no IP on {iface}")
with _usb_tether_lock:
_usb_tether_state["active"] = False
except subprocess.TimeoutExpired:
logger.warning(f"[usb-tether] udhcpc timed out on {iface}")
with _usb_tether_lock:
_usb_tether_state["active"] = False
except Exception as e:
logger.warning(f"[usb-tether] udhcpc failed: {e}")
with _usb_tether_lock:
_usb_tether_state["active"] = False
def _stop_usb_tether():
"""Clean up USB tethering — remove routes and release lease."""
iface = USB_TETHER_IFACE
logger.info(f"[usb-tether] {iface} carrier lost — cleaning up")
with _usb_tether_lock:
_usb_tether_state["active"] = False
try:
# Remove default route via this interface if it exists
result = subprocess.run(
["ip", "route", "show", "dev", iface],
capture_output=True, text=True, timeout=5
)
if result.stdout.strip():
subprocess.run(["ip", "route", "flush", "dev", iface], timeout=5)
logger.info(f"[usb-tether] Flushed routes for {iface}")
except Exception as e:
logger.warning(f"[usb-tether] Route cleanup failed: {e}")
try:
subprocess.run(["ip", "link", "set", iface, "down"], timeout=5)
except Exception as e:
logger.warning(f"[usb-tether] ip link set down failed: {e}")
logger.info(f"[usb-tether] Cleanup done — falling back to WiFi if available")
def usb_tether_monitor():
"""Background thread: watch for usb tether interface, auto-connect/disconnect.
usb2 (or USB_TETHER_IFACE) is a dynamic interface — it only exists when the
phone is physically plugged in. This monitor:
1. Waits for the interface to appear
2. Brings it admin-up so the kernel can detect carrier
3. Waits for carrier (operstate=up) = tethering enabled on phone
4. Runs udhcpc to get IP
5. On carrier loss or interface disappearance, cleans up
"""
iface = USB_TETHER_IFACE
logger.info(f"[usb-tether] Monitor started watching for {iface}")
iface_was_present = False
while True:
try:
state = _read_operstate(iface)
iface_present = (state != "missing")
if not iface_present:
# Interface not plugged in yet
if iface_was_present:
logger.info(f"[usb-tether] {iface} removed")
with _usb_tether_lock:
if _usb_tether_state["active"]:
_stop_usb_tether()
iface_was_present = False
time.sleep(USB_TETHER_POLL_SEC)
continue
# Interface exists — ensure it's admin-up so carrier is detectable
if not iface_was_present:
logger.info(f"[usb-tether] {iface} appeared (operstate={state}) — setting admin-up")
try:
subprocess.run(["ip", "link", "set", iface, "up"], timeout=5, check=False)
except Exception:
pass
iface_was_present = True
time.sleep(USB_TETHER_POLL_SEC)
continue
# Interface is up — check carrier
with _usb_tether_lock:
already_active = _usb_tether_state["active"]
if state == "up" and not already_active:
# Carrier arrived — tethering just enabled on phone
if _is_tether_iface(iface):
_start_usb_tether()
else:
logger.info(f"[usb-tether] {iface} carrier up but not a tether device — ignoring")
elif state != "up" and already_active:
# Carrier lost — tethering disabled or phone unplugged
_stop_usb_tether()
except Exception as e:
logger.warning(f"[usb-tether] Monitor error: {e}")
time.sleep(USB_TETHER_POLL_SEC)
def main():
"""Main entry point."""
logger.info("╔════════════════════════════════════════════════════════════╗")
logger.info(f"{VERSION:^56}")
logger.info("║ ADAcam Combined Service (ODC API + Forwarder) ║")
logger.info("╚════════════════════════════════════════════════════════════╝")
# Register signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Load configuration
load_config()
load_forwarder_state()
load_device_info()
logger.info(f"Device ID: {config.get('device_id', 'unknown')}")
logger.info(f"Device serial: {device_info.get('serial', 'unknown')}")
logger.info(f"Database: {ODC_DB_PATH}")
logger.info(f"Images: {IMAGES_DIR}")
logger.info(f"ADAMaps API: {config.get('adamaps_api')}")
# Run system initialization
system_init()
# Start background threads
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True, name="cleanup")
cleanup_thread.start()
logger.info("Started cleanup worker")
# Start USB tethering monitor
usb_thread = threading.Thread(target=usb_tether_monitor, daemon=True, name="usb-tether")
usb_thread.start()
logger.info(f"Started USB tether monitor on {USB_TETHER_IFACE}")
# Start forwarder thread
fwd_thread = start_forwarder_thread()
if fwd_thread:
logger.info("Started forwarder thread")
# Check Redis
if get_redis():
logger.info("Redis connected")
else:
logger.warning("Redis not available — ML status checks will fail")
# Check database
if ODC_DB_PATH.exists():
logger.info(f"Database found: {ODC_DB_PATH}")
else:
logger.warning(f"Database not found: {ODC_DB_PATH}")
# Run Flask (Thread 1)
logger.info(f"API server starting on http://0.0.0.0:{PORT}")
app.run(host="0.0.0.0", port=PORT, threaded=True, debug=False)
if __name__ == "__main__":
main()