varroa/blackbox/air_aggregator.py
Cobb Hayes 7f9bbd3f21 Rotate AdaMaps ingest+read keys (env-required, no inline default)
Previous values (adamaps-ingest-2026, adamaps-read-2026, mapnet-ingest-2026)
were inline defaults across adamaps + adacam-api + varroa. The ingest key
was briefly anon-visible during the 2026-05-27 Forgejo public-flip when
adacam-api + varroa were public for a short window before the leak was
spotted.

New values live in Vaultwarden:
  - AdaMaps — API_KEY (ingest)
  - AdaMaps — READ_KEY

Validators now hard-fail at boot if the env var is missing. Service is
on hold today; when it resumes, both env vars must be set.
2026-05-27 09:17:23 -07:00

235 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""
air-aggregator.py — Blackbox Air Quality Service
Reads BME680 (I2C) + PMS5003 (UART), fuses with GPS from Bee API,
ships readings to AdaMaps ingest endpoint.
Hardware:
BME680 → I2C (SDA=GPIO2, SCL=GPIO3, addr=0x77)
PMS5003 → UART (/dev/serial0 or /dev/ttyS0, 9600 baud)
Dependencies:
pip install bme680 requests smbus2
PMS5003 is read manually (no dep needed)
Config via env vars or edit constants below.
"""
import os, time, json, struct, logging, threading
import serial
import smbus2
import bme680
import requests
from datetime import datetime, timezone
# ── Config ────────────────────────────────────────────────────────────────────
BEE_URL = os.environ.get("BEE_URL", "http://192.168.197.1:5000")
ADAMAPS_URL = os.environ.get("ADAMAPS_URL", "https://api.adamaps.org")
ADAMAPS_KEY = os.environ.get("ADAMAPS_KEY", "<your-adamaps-ingest-key>")
DEVICE_ID = os.environ.get("DEVICE_ID", "blackbox-pi")
PMS_PORT = os.environ.get("PMS_PORT", "/dev/ttyS0")
SEND_INTERVAL = int(os.environ.get("SEND_INTERVAL", "10")) # seconds
BUFFER_FILE = "/tmp/air_buffer.json"
# ──────────────────────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("air-aggregator")
# ── BME680 ────────────────────────────────────────────────────────────────────
def init_bme680():
try:
sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY)
sensor.set_humidity_oversample(bme680.OS_2X)
sensor.set_pressure_oversample(bme680.OS_4X)
sensor.set_temperature_oversample(bme680.OS_8X)
sensor.set_filter(bme680.FILTER_SIZE_3)
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS)
sensor.set_gas_heater_temperature(320)
sensor.set_gas_heater_duration(150)
sensor.select_gas_heater_profile(0)
log.info("BME680 initialized")
return sensor
except Exception as e:
log.error("BME680 init failed: %s", e)
return None
def read_bme680(sensor):
if not sensor:
return {}
try:
if sensor.get_sensor_data():
return {
"temperature_c": round(sensor.data.temperature, 2),
"humidity_pct": round(sensor.data.humidity, 2),
"pressure_hpa": round(sensor.data.pressure, 2),
"gas_resistance_ohm": sensor.data.gas_resistance if sensor.data.heat_stable else None,
}
except Exception as e:
log.debug("BME680 read error: %s", e)
return {}
# ── PMS5003 ───────────────────────────────────────────────────────────────────
# Protocol: 32-byte frames, start bytes 0x42 0x4d
PMS_START = b'\x42\x4d'
PMS_FRAME = 32
def init_pms5003():
try:
ser = serial.Serial(PMS_PORT, baudrate=9600, timeout=2)
log.info("PMS5003 on %s", PMS_PORT)
return ser
except Exception as e:
log.error("PMS5003 init failed: %s", e)
return None
def read_pms5003(ser):
if not ser:
return {}
try:
# Sync to frame start
while True:
b = ser.read(1)
if b == b'\x42':
b2 = ser.read(1)
if b2 == b'\x4d':
break
frame = ser.read(PMS_FRAME - 2)
if len(frame) < PMS_FRAME - 2:
return {}
# Parse: skip length (2 bytes), then 13 uint16s
vals = struct.unpack('>13H', frame[2:28])
return {
"pm1_0_ug_m3": vals[3], # atmospheric env
"pm2_5_ug_m3": vals[4],
"pm10_ug_m3": vals[5],
"particles_0_3_per_dl": vals[6],
"particles_0_5_per_dl": vals[7],
"particles_1_0_per_dl": vals[8],
"particles_2_5_per_dl": vals[9],
}
except Exception as e:
log.debug("PMS5003 read error: %s", e)
return {}
# ── GPS from Bee ──────────────────────────────────────────────────────────────
_gps_cache = {}
def fetch_gps():
global _gps_cache
try:
r = requests.get(f"{BEE_URL}/api/1/gnssConcise/latestValid", timeout=3)
if r.ok:
d = r.json()
_gps_cache = {
"lat": d.get("lat") or d.get("latitude"),
"lon": d.get("lon") or d.get("longitude"),
"alt": d.get("alt") or d.get("altitude"),
"gps_fix": True,
}
except Exception as e:
log.debug("GPS fetch failed: %s", e)
_gps_cache["gps_fix"] = False
return _gps_cache
def gps_poller():
while True:
fetch_gps()
time.sleep(1)
# ── Buffer ────────────────────────────────────────────────────────────────────
def buffer_reading(reading):
buf = []
try:
with open(BUFFER_FILE) as f:
buf = json.load(f)
except: pass
buf.append(reading)
# Cap buffer at 1000 readings (~3 hours at 10s interval)
if len(buf) > 1000:
buf = buf[-1000:]
with open(BUFFER_FILE, 'w') as f:
json.dump(buf, f)
def flush_buffer():
try:
with open(BUFFER_FILE) as f:
buf = json.load(f)
except: return
if not buf: return
try:
r = requests.post(
f"{ADAMAPS_URL}/api/ingest/air",
json={"device_id": DEVICE_ID, "readings": buf},
headers={"X-AdaMaps-Key": ADAMAPS_KEY},
timeout=15
)
if r.ok:
log.info("Flushed %d buffered readings", len(buf))
with open(BUFFER_FILE, 'w') as f:
json.dump([], f)
else:
log.warning("Flush failed: %s", r.status_code)
except Exception as e:
log.warning("Flush error: %s", e)
# ── Main loop ─────────────────────────────────────────────────────────────────
def main():
bme = init_bme680()
pms = init_pms5003()
# GPS in background thread
t = threading.Thread(target=gps_poller, daemon=True)
t.start()
log.info("Air aggregator started — sending every %ds", SEND_INTERVAL)
last_send = 0
while True:
bme_data = read_bme680(bme)
pms_data = read_pms5003(pms)
gps = dict(_gps_cache)
reading = {
"device_id": DEVICE_ID,
"sampled_at": datetime.now(timezone.utc).isoformat(),
"lat": gps.get("lat"),
"lon": gps.get("lon"),
"alt": gps.get("alt"),
"gps_fix": gps.get("gps_fix", False),
**bme_data,
**pms_data,
}
now = time.time()
if now - last_send >= SEND_INTERVAL:
# Try live send first, buffer on failure
try:
flush_buffer() # drain any backlog first
r = requests.post(
f"{ADAMAPS_URL}/api/ingest/air",
json={"device_id": DEVICE_ID, "readings": [reading]},
headers={"X-AdaMaps-Key": ADAMAPS_KEY},
timeout=10
)
if r.ok:
log.info("Sent | PM2.5=%.1f lat=%s lon=%s",
reading.get("pm2_5_ug_m3", 0),
reading.get("lat"), reading.get("lon"))
else:
log.warning("Send failed %s — buffering", r.status_code)
buffer_reading(reading)
except Exception as e:
log.warning("Offline (%s) — buffering", e)
buffer_reading(reading)
last_send = now
time.sleep(1)
if __name__ == "__main__":
main()