#!/usr/bin/env python3 """ air-aggregator.py — Blackbox Air Quality Service Reads BME680 (I2C) + PMS5003 (UART), fuses with GPS from Bee API, ships readings to AdaMaps ingest endpoint. Hardware: BME680 → I2C (SDA=GPIO2, SCL=GPIO3, addr=0x77) PMS5003 → UART (/dev/serial0 or /dev/ttyS0, 9600 baud) Dependencies: pip install bme680 requests smbus2 PMS5003 is read manually (no dep needed) Config via env vars or edit constants below. """ import os, time, json, struct, logging, threading import serial import smbus2 import bme680 import requests from datetime import datetime, timezone # ── Config ──────────────────────────────────────────────────────────────────── BEE_URL = os.environ.get("BEE_URL", "http://192.168.197.1:5000") ADAMAPS_URL = os.environ.get("ADAMAPS_URL", "https://api.adamaps.org") ADAMAPS_KEY = os.environ.get("ADAMAPS_KEY", "") DEVICE_ID = os.environ.get("DEVICE_ID", "blackbox-pi") PMS_PORT = os.environ.get("PMS_PORT", "/dev/ttyS0") SEND_INTERVAL = int(os.environ.get("SEND_INTERVAL", "10")) # seconds BUFFER_FILE = "/tmp/air_buffer.json" # ────────────────────────────────────────────────────────────────────────────── logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") log = logging.getLogger("air-aggregator") # ── BME680 ──────────────────────────────────────────────────────────────────── def init_bme680(): try: sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY) sensor.set_humidity_oversample(bme680.OS_2X) sensor.set_pressure_oversample(bme680.OS_4X) sensor.set_temperature_oversample(bme680.OS_8X) sensor.set_filter(bme680.FILTER_SIZE_3) sensor.set_gas_status(bme680.ENABLE_GAS_MEAS) sensor.set_gas_heater_temperature(320) sensor.set_gas_heater_duration(150) sensor.select_gas_heater_profile(0) log.info("BME680 initialized") return sensor except Exception as e: log.error("BME680 init failed: %s", e) return None def read_bme680(sensor): if not sensor: return {} try: if sensor.get_sensor_data(): return { "temperature_c": round(sensor.data.temperature, 2), "humidity_pct": round(sensor.data.humidity, 2), "pressure_hpa": round(sensor.data.pressure, 2), "gas_resistance_ohm": sensor.data.gas_resistance if sensor.data.heat_stable else None, } except Exception as e: log.debug("BME680 read error: %s", e) return {} # ── PMS5003 ─────────────────────────────────────────────────────────────────── # Protocol: 32-byte frames, start bytes 0x42 0x4d PMS_START = b'\x42\x4d' PMS_FRAME = 32 def init_pms5003(): try: ser = serial.Serial(PMS_PORT, baudrate=9600, timeout=2) log.info("PMS5003 on %s", PMS_PORT) return ser except Exception as e: log.error("PMS5003 init failed: %s", e) return None def read_pms5003(ser): if not ser: return {} try: # Sync to frame start while True: b = ser.read(1) if b == b'\x42': b2 = ser.read(1) if b2 == b'\x4d': break frame = ser.read(PMS_FRAME - 2) if len(frame) < PMS_FRAME - 2: return {} # Parse: skip length (2 bytes), then 13 uint16s vals = struct.unpack('>13H', frame[2:28]) return { "pm1_0_ug_m3": vals[3], # atmospheric env "pm2_5_ug_m3": vals[4], "pm10_ug_m3": vals[5], "particles_0_3_per_dl": vals[6], "particles_0_5_per_dl": vals[7], "particles_1_0_per_dl": vals[8], "particles_2_5_per_dl": vals[9], } except Exception as e: log.debug("PMS5003 read error: %s", e) return {} # ── GPS from Bee ────────────────────────────────────────────────────────────── _gps_cache = {} def fetch_gps(): global _gps_cache try: r = requests.get(f"{BEE_URL}/api/1/gnssConcise/latestValid", timeout=3) if r.ok: d = r.json() _gps_cache = { "lat": d.get("lat") or d.get("latitude"), "lon": d.get("lon") or d.get("longitude"), "alt": d.get("alt") or d.get("altitude"), "gps_fix": True, } except Exception as e: log.debug("GPS fetch failed: %s", e) _gps_cache["gps_fix"] = False return _gps_cache def gps_poller(): while True: fetch_gps() time.sleep(1) # ── Buffer ──────────────────────────────────────────────────────────────────── def buffer_reading(reading): buf = [] try: with open(BUFFER_FILE) as f: buf = json.load(f) except: pass buf.append(reading) # Cap buffer at 1000 readings (~3 hours at 10s interval) if len(buf) > 1000: buf = buf[-1000:] with open(BUFFER_FILE, 'w') as f: json.dump(buf, f) def flush_buffer(): try: with open(BUFFER_FILE) as f: buf = json.load(f) except: return if not buf: return try: r = requests.post( f"{ADAMAPS_URL}/api/ingest/air", json={"device_id": DEVICE_ID, "readings": buf}, headers={"X-AdaMaps-Key": ADAMAPS_KEY}, timeout=15 ) if r.ok: log.info("Flushed %d buffered readings", len(buf)) with open(BUFFER_FILE, 'w') as f: json.dump([], f) else: log.warning("Flush failed: %s", r.status_code) except Exception as e: log.warning("Flush error: %s", e) # ── Main loop ───────────────────────────────────────────────────────────────── def main(): bme = init_bme680() pms = init_pms5003() # GPS in background thread t = threading.Thread(target=gps_poller, daemon=True) t.start() log.info("Air aggregator started — sending every %ds", SEND_INTERVAL) last_send = 0 while True: bme_data = read_bme680(bme) pms_data = read_pms5003(pms) gps = dict(_gps_cache) reading = { "device_id": DEVICE_ID, "sampled_at": datetime.now(timezone.utc).isoformat(), "lat": gps.get("lat"), "lon": gps.get("lon"), "alt": gps.get("alt"), "gps_fix": gps.get("gps_fix", False), **bme_data, **pms_data, } now = time.time() if now - last_send >= SEND_INTERVAL: # Try live send first, buffer on failure try: flush_buffer() # drain any backlog first r = requests.post( f"{ADAMAPS_URL}/api/ingest/air", json={"device_id": DEVICE_ID, "readings": [reading]}, headers={"X-AdaMaps-Key": ADAMAPS_KEY}, timeout=10 ) if r.ok: log.info("Sent | PM2.5=%.1f lat=%s lon=%s", reading.get("pm2_5_ug_m3", 0), reading.get("lat"), reading.get("lon")) else: log.warning("Send failed %s — buffering", r.status_code) buffer_reading(reading) except Exception as e: log.warning("Offline (%s) — buffering", e) buffer_reading(reading) last_send = now time.sleep(1) if __name__ == "__main__": main()