"""GPS data reader for adacam-api. Primary source: /data/recording/odc-api.db framekms table (confirmed schema from live device). Fallback: Redis (keys may appear when device has outdoor GPS fix post-liberation). Confirmed field names from live device: latitude, longitude, altitude, hdop, satellites_used, speed, heading, time """ import json import os import sqlite3 import time ODC_DB = '/data/recording/odc-api.db' GNSS_REDIS_KEYS = ['GNSSFusion30Hz', 'GnssData', 'GnssPvt', 'GnssPosition'] _redis_client = None def _get_redis(): global _redis_client try: import redis if _redis_client is None: _redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True) _redis_client.ping() return _redis_client except Exception: _redis_client = None return None def get_latest_gnss(): """Get latest GPS fix. Returns dict with keys: lat_deg, lon_deg, alt_m, hdop, num_satellites. Field names kept compatible with original API response format.""" # Primary: SQLite framekms (confirmed live device schema) if os.path.exists(ODC_DB): try: conn = sqlite3.connect(ODC_DB) row = conn.execute( 'SELECT latitude, longitude, altitude, hdop, satellites_used, time ' 'FROM framekms WHERE latitude IS NOT NULL AND latitude != 0 ' 'ORDER BY time DESC LIMIT 1' ).fetchone() conn.close() if row: return { 'lat_deg': row[0], 'lon_deg': row[1], 'alt_m': row[2], 'hdop': row[3], 'num_satellites': row[4], 'unix_milliseconds': int(row[5]) if row[5] else int(time.time() * 1000), } except Exception: pass # Fallback: Redis (may appear post-liberation with outdoor GPS fix) r = _get_redis() if r: for key in GNSS_REDIS_KEYS: try: for getter in [lambda k: r.zrevrange(k, 0, 0), lambda k: [r.get(k)]]: items = getter(key) item = items[0] if items else None if item: data = json.loads(item) lat = data.get('latitude') or data.get('lat_deg') lon = data.get('longitude') or data.get('lon_deg') if lat and lon: return { 'lat_deg': lat, 'lon_deg': lon, 'alt_m': data.get('altitude') or data.get('alt_m', 0), 'hdop': data.get('hdop', 99), 'num_satellites': data.get('satellites_used') or data.get('num_satellites', 0), 'unix_milliseconds': int(data.get('unix_milliseconds', time.time() * 1000)), } except Exception: continue return None def has_gps_lock(): """Check if we have a valid GPS lock.""" gnss = get_latest_gnss() return gnss is not None and gnss.get('num_satellites', 0) >= 4