"""SQLite database for landmarks and offline queue.""" import sqlite3 import threading import json DB_PATH = "/data/adacam/adacam.db" _local = threading.local() def get_conn(): """Get thread-local database connection.""" if not hasattr(_local, "conn"): _local.conn = sqlite3.connect(DB_PATH, check_same_thread=False) _local.conn.row_factory = sqlite3.Row return _local.conn def init(): """Initialize database schema.""" conn = get_conn() conn.executescript(""" CREATE TABLE IF NOT EXISTS landmarks ( id INTEGER PRIMARY KEY, class_label TEXT, class_label_confidence REAL, overall_confidence REAL, lat REAL, lon REAL, alt REAL, azimuth REAL, width INTEGER, height INTEGER, ts INTEGER, forwarded INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS forward_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, payload TEXT, created_at INTEGER DEFAULT (strftime('%s','now')) ); CREATE INDEX IF NOT EXISTS idx_landmarks_ts ON landmarks(ts DESC); CREATE INDEX IF NOT EXISTS idx_landmarks_forwarded ON landmarks(forwarded); """) conn.commit() def insert_landmark(data): """Insert a landmark detection.""" conn = get_conn() conn.execute(""" INSERT INTO landmarks (id, class_label, class_label_confidence, overall_confidence, lat, lon, alt, azimuth, width, height, ts) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (data.get("id"), data["class_label"], data.get("class_label_confidence"), data.get("overall_confidence"), data.get("lat"), data.get("lon"), data.get("alt"), data.get("azimuth"), data.get("width"), data.get("height"), data.get("ts"))) conn.commit() return conn.execute("SELECT last_insert_rowid()").fetchone()[0] def get_last_landmarks(n): """Get last N landmarks.""" conn = get_conn() rows = conn.execute("SELECT * FROM landmarks ORDER BY ts DESC LIMIT ?", (n,)).fetchall() return [dict(r) for r in rows] def queue_forward(payload): """Queue a payload for later forwarding.""" conn = get_conn() conn.execute("INSERT INTO forward_queue (payload) VALUES (?)", (json.dumps(payload),)) conn.commit() def pop_queue(limit=50): """Pop items from forward queue.""" conn = get_conn() rows = conn.execute("SELECT id, payload FROM forward_queue ORDER BY id LIMIT ?", (limit,)).fetchall() if rows: ids = [r["id"] for r in rows] conn.execute(f"DELETE FROM forward_queue WHERE id IN ({','.join('?'*len(ids))})", ids) conn.commit() return [(r["id"], json.loads(r["payload"])) for r in rows] def mark_forwarded(landmark_id): """Mark a landmark as forwarded.""" conn = get_conn() conn.execute("UPDATE landmarks SET forwarded = 1 WHERE id = ?", (landmark_id,)) conn.commit()