From 28c65660680947181d5ab51b0dd149f07bbbc729 Mon Sep 17 00:00:00 2001 From: Kayos Date: Fri, 13 Mar 2026 07:37:54 -0700 Subject: [PATCH] =?UTF-8?q?blackbox:=20air=20quality=20aggregator=20(Pi=20?= =?UTF-8?q?3B+=20=E2=86=92=20AdaMaps)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- blackbox/air-aggregator.service | 21 +++ blackbox/air_aggregator.py | 235 ++++++++++++++++++++++++++++++++ 2 files changed, 256 insertions(+) create mode 100644 blackbox/air-aggregator.service create mode 100644 blackbox/air_aggregator.py diff --git a/blackbox/air-aggregator.service b/blackbox/air-aggregator.service new file mode 100644 index 0000000..ebeae22 --- /dev/null +++ b/blackbox/air-aggregator.service @@ -0,0 +1,21 @@ +[Unit] +Description=Blackbox Air Quality Aggregator +After=network.target + +[Service] +Type=simple +User=pi +Restart=always +RestartSec=10 +ExecStart=/usr/bin/python3 /home/pi/air-aggregator.py +Environment=BEE_URL=http://192.168.197.1:5000 +Environment=ADAMAPS_URL=https://api.adamaps.org +Environment=ADAMAPS_KEY=***REMOVED*** +Environment=DEVICE_ID=blackbox-pi +Environment=PMS_PORT=/dev/ttyS0 +Environment=SEND_INTERVAL=10 +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=multi-user.target diff --git a/blackbox/air_aggregator.py b/blackbox/air_aggregator.py new file mode 100644 index 0000000..1370947 --- /dev/null +++ b/blackbox/air_aggregator.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +""" +air-aggregator.py — Blackbox Air Quality Service +Reads BME680 (I2C) + PMS5003 (UART), fuses with GPS from Bee API, +ships readings to AdaMaps ingest endpoint. + +Hardware: + BME680 → I2C (SDA=GPIO2, SCL=GPIO3, addr=0x77) + PMS5003 → UART (/dev/serial0 or /dev/ttyS0, 9600 baud) + +Dependencies: + pip install bme680 requests smbus2 + PMS5003 is read manually (no dep needed) + +Config via env vars or edit constants below. +""" + +import os, time, json, struct, logging, threading +import serial +import smbus2 +import bme680 +import requests +from datetime import datetime, timezone + +# ── Config ──────────────────────────────────────────────────────────────────── +BEE_URL = os.environ.get("BEE_URL", "http://192.168.197.1:5000") +ADAMAPS_URL = os.environ.get("ADAMAPS_URL", "https://api.adamaps.org") +ADAMAPS_KEY = os.environ.get("ADAMAPS_KEY", "***REMOVED***") +DEVICE_ID = os.environ.get("DEVICE_ID", "blackbox-pi") +PMS_PORT = os.environ.get("PMS_PORT", "/dev/ttyS0") +SEND_INTERVAL = int(os.environ.get("SEND_INTERVAL", "10")) # seconds +BUFFER_FILE = "/tmp/air_buffer.json" +# ────────────────────────────────────────────────────────────────────────────── + +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s") +log = logging.getLogger("air-aggregator") + + +# ── BME680 ──────────────────────────────────────────────────────────────────── +def init_bme680(): + try: + sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY) + sensor.set_humidity_oversample(bme680.OS_2X) + sensor.set_pressure_oversample(bme680.OS_4X) + sensor.set_temperature_oversample(bme680.OS_8X) + sensor.set_filter(bme680.FILTER_SIZE_3) + sensor.set_gas_status(bme680.ENABLE_GAS_MEAS) + sensor.set_gas_heater_temperature(320) + sensor.set_gas_heater_duration(150) + sensor.select_gas_heater_profile(0) + log.info("BME680 initialized") + return sensor + except Exception as e: + log.error("BME680 init failed: %s", e) + return None + +def read_bme680(sensor): + if not sensor: + return {} + try: + if sensor.get_sensor_data(): + return { + "temperature_c": round(sensor.data.temperature, 2), + "humidity_pct": round(sensor.data.humidity, 2), + "pressure_hpa": round(sensor.data.pressure, 2), + "gas_resistance_ohm": sensor.data.gas_resistance if sensor.data.heat_stable else None, + } + except Exception as e: + log.debug("BME680 read error: %s", e) + return {} + + +# ── PMS5003 ─────────────────────────────────────────────────────────────────── +# Protocol: 32-byte frames, start bytes 0x42 0x4d +PMS_START = b'\x42\x4d' +PMS_FRAME = 32 + +def init_pms5003(): + try: + ser = serial.Serial(PMS_PORT, baudrate=9600, timeout=2) + log.info("PMS5003 on %s", PMS_PORT) + return ser + except Exception as e: + log.error("PMS5003 init failed: %s", e) + return None + +def read_pms5003(ser): + if not ser: + return {} + try: + # Sync to frame start + while True: + b = ser.read(1) + if b == b'\x42': + b2 = ser.read(1) + if b2 == b'\x4d': + break + frame = ser.read(PMS_FRAME - 2) + if len(frame) < PMS_FRAME - 2: + return {} + # Parse: skip length (2 bytes), then 13 uint16s + vals = struct.unpack('>13H', frame[2:28]) + return { + "pm1_0_ug_m3": vals[3], # atmospheric env + "pm2_5_ug_m3": vals[4], + "pm10_ug_m3": vals[5], + "particles_0_3_per_dl": vals[6], + "particles_0_5_per_dl": vals[7], + "particles_1_0_per_dl": vals[8], + "particles_2_5_per_dl": vals[9], + } + except Exception as e: + log.debug("PMS5003 read error: %s", e) + return {} + + +# ── GPS from Bee ────────────────────────────────────────────────────────────── +_gps_cache = {} + +def fetch_gps(): + global _gps_cache + try: + r = requests.get(f"{BEE_URL}/api/1/gnssConcise/latestValid", timeout=3) + if r.ok: + d = r.json() + _gps_cache = { + "lat": d.get("lat") or d.get("latitude"), + "lon": d.get("lon") or d.get("longitude"), + "alt": d.get("alt") or d.get("altitude"), + "gps_fix": True, + } + except Exception as e: + log.debug("GPS fetch failed: %s", e) + _gps_cache["gps_fix"] = False + return _gps_cache + +def gps_poller(): + while True: + fetch_gps() + time.sleep(1) + + +# ── Buffer ──────────────────────────────────────────────────────────────────── +def buffer_reading(reading): + buf = [] + try: + with open(BUFFER_FILE) as f: + buf = json.load(f) + except: pass + buf.append(reading) + # Cap buffer at 1000 readings (~3 hours at 10s interval) + if len(buf) > 1000: + buf = buf[-1000:] + with open(BUFFER_FILE, 'w') as f: + json.dump(buf, f) + +def flush_buffer(): + try: + with open(BUFFER_FILE) as f: + buf = json.load(f) + except: return + if not buf: return + try: + r = requests.post( + f"{ADAMAPS_URL}/api/ingest/air", + json={"device_id": DEVICE_ID, "readings": buf}, + headers={"X-AdaMaps-Key": ADAMAPS_KEY}, + timeout=15 + ) + if r.ok: + log.info("Flushed %d buffered readings", len(buf)) + with open(BUFFER_FILE, 'w') as f: + json.dump([], f) + else: + log.warning("Flush failed: %s", r.status_code) + except Exception as e: + log.warning("Flush error: %s", e) + + +# ── Main loop ───────────────────────────────────────────────────────────────── +def main(): + bme = init_bme680() + pms = init_pms5003() + + # GPS in background thread + t = threading.Thread(target=gps_poller, daemon=True) + t.start() + + log.info("Air aggregator started — sending every %ds", SEND_INTERVAL) + last_send = 0 + + while True: + bme_data = read_bme680(bme) + pms_data = read_pms5003(pms) + gps = dict(_gps_cache) + + reading = { + "device_id": DEVICE_ID, + "sampled_at": datetime.now(timezone.utc).isoformat(), + "lat": gps.get("lat"), + "lon": gps.get("lon"), + "alt": gps.get("alt"), + "gps_fix": gps.get("gps_fix", False), + **bme_data, + **pms_data, + } + + now = time.time() + if now - last_send >= SEND_INTERVAL: + # Try live send first, buffer on failure + try: + flush_buffer() # drain any backlog first + r = requests.post( + f"{ADAMAPS_URL}/api/ingest/air", + json={"device_id": DEVICE_ID, "readings": [reading]}, + headers={"X-AdaMaps-Key": ADAMAPS_KEY}, + timeout=10 + ) + if r.ok: + log.info("Sent | PM2.5=%.1f lat=%s lon=%s", + reading.get("pm2_5_ug_m3", 0), + reading.get("lat"), reading.get("lon")) + else: + log.warning("Send failed %s — buffering", r.status_code) + buffer_reading(reading) + except Exception as e: + log.warning("Offline (%s) — buffering", e) + buffer_reading(reading) + last_send = now + + time.sleep(1) + +if __name__ == "__main__": + main()