refactor: compact services to 223 lines total

This commit is contained in:
Kayos 2026-03-14 10:58:55 -07:00
parent 7fc0c4b21d
commit 86188ef54f
2 changed files with 96 additions and 372 deletions

View file

@ -1,239 +1,114 @@
#!/usr/bin/env python3
"""
adacam-capture: GStreamer-based camera capture service for AdaCam.
Manages the Keem Bay camera pipeline, captures JPEG frames, and publishes
frame paths to Redis for downstream consumers.
"""
import json
import logging
import os
import signal
import subprocess
import sys
import time
"""adacam-capture: GStreamer camera capture for AdaCam/Keem Bay."""
import json, logging, os, signal, subprocess, sys, time
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
stream=sys.stdout
)
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', stream=sys.stdout)
log = logging.getLogger('adacam-capture')
# Paths
CONFIG_PATH = Path('/data/adacam/config.json')
FRAME_DIR = Path('/tmp/adacam/pics')
PIPE_FRAME = Path('/tmp/recording/pics/cam0pipe.jpg')
VPU_FWNAME = '/sys/devices/platform/soc/soc:vpusmm/fwname'
# Limits
MAX_FILES = 1300
MAX_BYTES = 500 * 1024 * 1024 # 500MB
# Kernel modules needed
MAX_FILES, MAX_BYTES = 1300, 500 * 1024 * 1024
MODULES = ['kmb_flash', 'kmb_lens', 'kmb_cam', 'pwm_keembay', 'hantro']
GST_PIPELINE = '''kmbcamsrc num-frames=-1 name=kmbsrc transform-hub=full awb-mode=daylight
stride-align=64 scanline-align=64
! capsfilter caps="video/x-raw(memory:DMABuf),format=NV12,width=2028,height=1024,framerate=30/1"
! vaapijpegenc ! multifilesink location=/tmp/recording/pics/cam0pipe.jpg'''
# GStreamer pipeline (JPEG capture at 2028x1024 @ 30fps)
GST_PIPELINE = '''
kmbcamsrc num-frames=-1 name=kmbsrc transform-hub=full awb-mode=daylight \
stride-align=64 scanline-align=64 \
! capsfilter caps="video/x-raw(memory:DMABuf),format=NV12,width=2028,height=1024,framerate=30/1" \
! vaapijpegenc \
! multifilesink location=/tmp/recording/pics/cam0pipe.jpg
'''
running = True
redis_client = None
running, redis_client = True, None
def load_config():
"""Load config from JSON file."""
if CONFIG_PATH.exists():
with open(CONFIG_PATH) as f:
return json.load(f)
log.warning(f'Config not found at {CONFIG_PATH}, using defaults')
with open(CONFIG_PATH) as f: return json.load(f)
return {'device_id': 'unknown', 'adamaps_key': '', 'adamaps_api': ''}
def get_redis():
"""Get Redis client with lazy init and reconnection."""
global redis_client
if redis_client is not None:
try:
redis_client.ping()
return redis_client
except Exception:
redis_client = None
if redis_client:
try: redis_client.ping(); return redis_client
except: redis_client = None
try:
import redis
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
redis_client.ping()
log.info('Connected to Redis')
return redis_client
except Exception as e:
log.warning(f'Redis unavailable: {e}')
return None
redis_client.ping(); log.info('Connected to Redis'); return redis_client
except Exception as e: log.warning(f'Redis unavailable: {e}'); return None
def load_modules():
"""Load required kernel modules if not already loaded."""
loaded = set()
try:
with open('/proc/modules') as f:
for line in f:
loaded.add(line.split()[0])
except Exception as e:
log.warning(f'Could not read /proc/modules: {e}')
return
with open('/proc/modules') as f: loaded = {l.split()[0] for l in f}
except: return
for mod in MODULES:
if mod not in loaded:
log.info(f'Loading module: {mod}')
try:
subprocess.run(['modprobe', mod], check=True, capture_output=True)
except subprocess.CalledProcessError as e:
log.warning(f'Failed to load {mod}: {e.stderr.decode().strip()}')
try: subprocess.run(['modprobe', mod], check=True, capture_output=True)
except subprocess.CalledProcessError as e: log.warning(f'Failed to load {mod}')
def load_vpu_firmware():
"""Load VPU firmware for Myriad X."""
if os.path.exists(VPU_FWNAME):
try:
with open(VPU_FWNAME, 'w') as f:
f.write('luxonis_vpu.bin')
with open(VPU_FWNAME, 'w') as f: f.write('luxonis_vpu.bin')
log.info('VPU firmware loaded')
except Exception as e:
log.warning(f'Could not load VPU firmware: {e}')
except Exception as e: log.warning(f'VPU firmware error: {e}')
else:
# Try StartVpu command
try:
subprocess.run(['StartVpu', 'luxonis_vpu.bin'], check=True, capture_output=True)
log.info('VPU firmware loaded via StartVpu')
except Exception as e:
log.warning(f'StartVpu not available or failed: {e}')
try: subprocess.run(['StartVpu', 'luxonis_vpu.bin'], check=True, capture_output=True); log.info('VPU loaded via StartVpu')
except: log.warning('StartVpu not available')
def start_gstreamer():
"""Start the GStreamer pipeline as a subprocess."""
PIPE_FRAME.parent.mkdir(parents=True, exist_ok=True)
FRAME_DIR.mkdir(parents=True, exist_ok=True)
cmd = ['gst-launch-1.0', '-e'] + GST_PIPELINE.split()
log.info('Starting GStreamer pipeline')
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return subprocess.Popen(['gst-launch-1.0', '-e'] + GST_PIPELINE.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def cleanup_frames():
"""Remove old frames to stay under limits."""
try:
frames = sorted(FRAME_DIR.glob('*.jpg'), key=lambda p: p.stat().st_mtime)
except Exception:
return
# Remove by count
try: frames = sorted(FRAME_DIR.glob('*.jpg'), key=lambda p: p.stat().st_mtime)
except: return
while len(frames) > MAX_FILES:
try:
frames[0].unlink()
frames.pop(0)
except Exception:
break
# Remove by size
try: frames[0].unlink(); frames.pop(0)
except: break
total = sum(f.stat().st_size for f in frames if f.exists())
while total > MAX_BYTES and frames:
try:
size = frames[0].stat().st_size
frames[0].unlink()
frames.pop(0)
total -= size
except Exception:
break
try: sz = frames[0].stat().st_size; frames[0].unlink(); frames.pop(0); total -= sz
except: break
def process_frame():
"""Check for new frame, rename with timestamp, publish to Redis."""
if not PIPE_FRAME.exists():
return False
if not PIPE_FRAME.exists(): return False
try:
ts = int(time.time() * 1000)
dest = FRAME_DIR / f'{ts}.jpg'
# Atomic rename
os.rename(PIPE_FRAME, dest)
# Publish to Redis
r = get_redis()
if r:
try:
r.publish('adacam:frames', str(dest))
except Exception as e:
log.warning(f'Redis publish failed: {e}')
try: r.publish('adacam:frames', str(dest))
except Exception as e: log.warning(f'Redis publish failed: {e}')
return True
except FileNotFoundError:
return False # Race condition, frame already moved
except Exception as e:
log.error(f'Frame processing error: {e}')
return False
except FileNotFoundError: return False
except Exception as e: log.error(f'Frame error: {e}'); return False
def handle_signal(signum, frame):
"""Handle shutdown signals."""
global running
log.info(f'Received signal {signum}, shutting down')
running = False
log.info(f'Signal {signum}, shutting down'); running = False
def main():
global running
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal); signal.signal(signal.SIGINT, handle_signal)
config = load_config()
log.info(f"AdaCam Capture starting (device: {config.get('device_id', 'unknown')})")
# Initialize hardware
load_modules()
load_vpu_firmware()
gst_proc = None
last_cleanup = 0
load_modules(); load_vpu_firmware()
gst_proc, last_cleanup = None, 0
while running:
# Start/restart GStreamer if needed
if gst_proc is None or gst_proc.poll() is not None:
if gst_proc is not None:
log.warning(f'GStreamer exited with code {gst_proc.returncode}, restarting')
time.sleep(2) # Brief pause before restart
if gst_proc: log.warning(f'GStreamer exited {gst_proc.returncode}, restarting'); time.sleep(2)
gst_proc = start_gstreamer()
# Process any new frames
frame_found = process_frame()
# Periodic cleanup (every 60s)
now = time.time()
if now - last_cleanup > 60:
cleanup_frames()
last_cleanup = now
# Poll interval
if now - last_cleanup > 60: cleanup_frames(); last_cleanup = now
time.sleep(0.01 if frame_found else 0.05)
# Cleanup
if gst_proc and gst_proc.poll() is None:
log.info('Stopping GStreamer pipeline')
gst_proc.terminate()
try:
gst_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
gst_proc.kill()
log.info('Stopping GStreamer'); gst_proc.terminate()
try: gst_proc.wait(timeout=5)
except: gst_proc.kill()
log.info('AdaCam Capture stopped')
if __name__ == '__main__':
main()
if __name__ == '__main__': main()

View file

@ -1,260 +1,109 @@
#!/usr/bin/env python3
"""
adacam-forwarder: GPS-tagged frame forwarder for AdaMaps.
Subscribes to Redis frame notifications, combines with GPS data,
and POSTs to the AdaMaps ingest API. Queues failures for retry.
"""
import json
import logging
import os
import sqlite3
import sys
import threading
import time
"""adacam-forwarder: GPS-tagged frame forwarder for AdaMaps."""
import json, logging, sqlite3, sys, threading, time
from pathlib import Path
import requests
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
stream=sys.stdout
)
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', stream=sys.stdout)
log = logging.getLogger('adacam-forwarder')
CONFIG_PATH = Path('/data/adacam/config.json')
QUEUE_DB = Path('/data/adacam/forward_queue.db')
# Globals
config = {}
redis_client = None
running = True
config, redis_client, running = {}, None, True
def load_config():
"""Load config from JSON file."""
if CONFIG_PATH.exists():
with open(CONFIG_PATH) as f:
return json.load(f)
log.warning(f'Config not found at {CONFIG_PATH}')
with open(CONFIG_PATH) as f: return json.load(f)
return {'device_id': 'unknown', 'adamaps_key': '', 'adamaps_api': 'https://api.adamaps.org'}
def init_queue_db():
"""Initialize SQLite queue for offline tolerance."""
QUEUE_DB.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(QUEUE_DB))
conn.execute('''
CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payload TEXT NOT NULL,
created_at REAL NOT NULL,
attempts INTEGER DEFAULT 0
)
''')
conn.commit()
return conn
conn.execute('CREATE TABLE IF NOT EXISTS queue (id INTEGER PRIMARY KEY, payload TEXT, created_at REAL, attempts INTEGER DEFAULT 0)')
conn.commit(); return conn
def get_redis():
"""Get Redis client with reconnection."""
global redis_client
if redis_client is not None:
try:
redis_client.ping()
return redis_client
except Exception:
redis_client = None
if redis_client:
try: redis_client.ping(); return redis_client
except: redis_client = None
try:
import redis
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
redis_client.ping()
log.info('Connected to Redis')
return redis_client
except Exception as e:
log.warning(f'Redis unavailable: {e}')
return None
redis_client.ping(); log.info('Connected to Redis'); return redis_client
except Exception as e: log.warning(f'Redis unavailable: {e}'); return None
def get_latest_gps(r):
"""Get latest GPS from GNSSFusion30Hz ZSET."""
try:
# Get newest entry (highest score)
results = r.zrevrange('GNSSFusion30Hz', 0, 0, withscores=True)
if results:
data = json.loads(results[0][0])
return {
'lat': data.get('lat_deg'),
'lon': data.get('lon_deg'),
'alt': data.get('alt_m'),
'ts': int(data.get('unix_milliseconds', time.time() * 1000))
}
except Exception as e:
log.warning(f'Failed to get GPS: {e}')
return {'lat': data.get('lat_deg'), 'lon': data.get('lon_deg'), 'alt': data.get('alt_m'),
'ts': int(data.get('unix_milliseconds', time.time() * 1000))}
except Exception as e: log.warning(f'GPS error: {e}')
return None
def queue_payload(db, payload):
"""Queue a failed payload for later retry."""
try:
db.execute(
'INSERT INTO queue (payload, created_at) VALUES (?, ?)',
(json.dumps(payload), time.time())
)
db.commit()
except Exception as e:
log.error(f'Failed to queue payload: {e}')
try: db.execute('INSERT INTO queue (payload, created_at) VALUES (?, ?)', (json.dumps(payload), time.time())); db.commit()
except Exception as e: log.error(f'Queue error: {e}')
def post_to_adamaps(payload):
"""POST frame data to AdaMaps API. Returns True on success."""
if not config.get('adamaps_key'):
log.warning('No AdaMaps API key configured')
return False
api_url = config.get('adamaps_api', 'https://api.adamaps.org')
url = f'{api_url}/api/ingest'
headers = {
'X-AdaMaps-Key': config['adamaps_key'],
'Content-Type': 'application/json'
}
if not config.get('adamaps_key'): return False
url = f"{config.get('adamaps_api', 'https://api.adamaps.org')}/api/ingest"
try:
resp = requests.post(url, json=payload, headers=headers, timeout=10)
if resp.status_code == 200:
return True
elif resp.status_code == 429:
log.warning('Rate limited by AdaMaps')
return False
else:
log.warning(f'AdaMaps returned {resp.status_code}: {resp.text[:100]}')
return False
except requests.RequestException as e:
log.warning(f'AdaMaps request failed: {e}')
return False
resp = requests.post(url, json=payload, headers={'X-AdaMaps-Key': config['adamaps_key'], 'Content-Type': 'application/json'}, timeout=10)
if resp.status_code == 200: return True
if resp.status_code == 429: log.warning('Rate limited')
else: log.warning(f'AdaMaps {resp.status_code}')
except requests.RequestException as e: log.warning(f'Request failed: {e}')
return False
def process_queue(db):
"""Retry queued payloads."""
try:
cursor = db.execute(
'SELECT id, payload, attempts FROM queue ORDER BY created_at LIMIT 50'
)
rows = cursor.fetchall()
for row_id, payload_json, attempts in rows:
if attempts > 10:
# Give up after 10 attempts
db.execute('DELETE FROM queue WHERE id = ?', (row_id,))
continue
payload = json.loads(payload_json)
if post_to_adamaps(payload):
db.execute('DELETE FROM queue WHERE id = ?', (row_id,))
else:
db.execute(
'UPDATE queue SET attempts = ? WHERE id = ?',
(attempts + 1, row_id)
)
time.sleep(0.1) # Rate limit
for row_id, payload_json, attempts in db.execute('SELECT id, payload, attempts FROM queue ORDER BY created_at LIMIT 50').fetchall():
if attempts > 10: db.execute('DELETE FROM queue WHERE id = ?', (row_id,)); continue
if post_to_adamaps(json.loads(payload_json)): db.execute('DELETE FROM queue WHERE id = ?', (row_id,))
else: db.execute('UPDATE queue SET attempts = ? WHERE id = ?', (attempts + 1, row_id))
time.sleep(0.1)
db.commit()
except Exception as e:
log.error(f'Queue processing error: {e}')
except Exception as e: log.error(f'Queue process error: {e}')
def queue_worker(db):
"""Background thread to retry queued items."""
while running:
time.sleep(60)
if running:
process_queue(db)
while running: time.sleep(60); running and process_queue(db)
def handle_frame(frame_path, db):
"""Process a single frame notification."""
path = Path(frame_path)
if not path.exists():
log.warning(f'Frame not found: {frame_path}')
return
r = get_redis()
gps = get_latest_gps(r) if r else None
try:
frame_size = path.stat().st_size
except Exception:
frame_size = 0
# Extract timestamp from filename
try:
ts = int(path.stem)
except ValueError:
ts = int(time.time() * 1000)
payload = {
'device_id': config.get('device_id', 'unknown'),
'ts': ts,
'lat': gps['lat'] if gps else None,
'lon': gps['lon'] if gps else None,
'alt': gps['alt'] if gps else None,
'frame_path': str(path),
'frame_size': frame_size
}
if not post_to_adamaps(payload):
queue_payload(db, payload)
if not path.exists(): return
r = get_redis(); gps = get_latest_gps(r) if r else None
try: frame_size = path.stat().st_size
except: frame_size = 0
try: ts = int(path.stem)
except: ts = int(time.time() * 1000)
payload = {'device_id': config.get('device_id', 'unknown'), 'ts': ts,
'lat': gps['lat'] if gps else None, 'lon': gps['lon'] if gps else None,
'alt': gps['alt'] if gps else None, 'frame_path': str(path), 'frame_size': frame_size}
if not post_to_adamaps(payload): queue_payload(db, payload)
def main():
global config, running
config = load_config()
log.info(f"AdaCam Forwarder starting (device: {config.get('device_id', 'unknown')})")
db = init_queue_db()
# Start queue retry thread
queue_thread = threading.Thread(target=queue_worker, args=(db,), daemon=True)
queue_thread.start()
# Main loop: subscribe to Redis frames channel
threading.Thread(target=queue_worker, args=(db,), daemon=True).start()
backoff = 1
while running:
r = get_redis()
if not r:
log.info(f'Waiting for Redis (backoff {backoff}s)...')
time.sleep(backoff)
backoff = min(backoff * 2, 30)
continue
if not r: log.info(f'Waiting for Redis ({backoff}s)...'); time.sleep(backoff); backoff = min(backoff * 2, 30); continue
backoff = 1
try:
import redis as redis_module
pubsub = r.pubsub()
pubsub.subscribe('adacam:frames')
log.info('Subscribed to adacam:frames')
for message in pubsub.listen():
if not running:
break
if message['type'] == 'message':
handle_frame(message['data'], db)
except redis_module.ConnectionError:
log.warning('Redis connection lost')
redis_client = None
except Exception as e:
log.error(f'Subscription error: {e}')
time.sleep(1)
pubsub = r.pubsub(); pubsub.subscribe('adacam:frames'); log.info('Subscribed to adacam:frames')
for msg in pubsub.listen():
if not running: break
if msg['type'] == 'message': handle_frame(msg['data'], db)
except: log.warning('Redis connection lost'); time.sleep(1)
log.info('AdaCam Forwarder stopped')
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
running = False
try: main()
except KeyboardInterrupt: running = False