blackbox: air quality aggregator (Pi 3B+ → AdaMaps)

This commit is contained in:
Kayos 2026-03-13 07:37:54 -07:00
parent 6b39f4ea23
commit 28cacd540f
2 changed files with 256 additions and 0 deletions

View file

@ -0,0 +1,21 @@
[Unit]
Description=Blackbox Air Quality Aggregator
After=network.target
[Service]
Type=simple
User=pi
Restart=always
RestartSec=10
ExecStart=/usr/bin/python3 /home/pi/air-aggregator.py
Environment=BEE_URL=http://192.168.197.1:5000
Environment=ADAMAPS_URL=https://api.adamaps.org
Environment=ADAMAPS_KEY=adamaps-ingest-2026
Environment=DEVICE_ID=blackbox-pi
Environment=PMS_PORT=/dev/ttyS0
Environment=SEND_INTERVAL=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target

235
blackbox/air_aggregator.py Normal file
View file

@ -0,0 +1,235 @@
#!/usr/bin/env python3
"""
air-aggregator.py Blackbox Air Quality Service
Reads BME680 (I2C) + PMS5003 (UART), fuses with GPS from Bee API,
ships readings to AdaMaps ingest endpoint.
Hardware:
BME680 I2C (SDA=GPIO2, SCL=GPIO3, addr=0x77)
PMS5003 UART (/dev/serial0 or /dev/ttyS0, 9600 baud)
Dependencies:
pip install bme680 requests smbus2
PMS5003 is read manually (no dep needed)
Config via env vars or edit constants below.
"""
import os, time, json, struct, logging, threading
import serial
import smbus2
import bme680
import requests
from datetime import datetime, timezone
# ── Config ────────────────────────────────────────────────────────────────────
BEE_URL = os.environ.get("BEE_URL", "http://192.168.197.1:5000")
ADAMAPS_URL = os.environ.get("ADAMAPS_URL", "https://api.adamaps.org")
ADAMAPS_KEY = os.environ.get("ADAMAPS_KEY", "adamaps-ingest-2026")
DEVICE_ID = os.environ.get("DEVICE_ID", "blackbox-pi")
PMS_PORT = os.environ.get("PMS_PORT", "/dev/ttyS0")
SEND_INTERVAL = int(os.environ.get("SEND_INTERVAL", "10")) # seconds
BUFFER_FILE = "/tmp/air_buffer.json"
# ──────────────────────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("air-aggregator")
# ── BME680 ────────────────────────────────────────────────────────────────────
def init_bme680():
try:
sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY)
sensor.set_humidity_oversample(bme680.OS_2X)
sensor.set_pressure_oversample(bme680.OS_4X)
sensor.set_temperature_oversample(bme680.OS_8X)
sensor.set_filter(bme680.FILTER_SIZE_3)
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS)
sensor.set_gas_heater_temperature(320)
sensor.set_gas_heater_duration(150)
sensor.select_gas_heater_profile(0)
log.info("BME680 initialized")
return sensor
except Exception as e:
log.error("BME680 init failed: %s", e)
return None
def read_bme680(sensor):
if not sensor:
return {}
try:
if sensor.get_sensor_data():
return {
"temperature_c": round(sensor.data.temperature, 2),
"humidity_pct": round(sensor.data.humidity, 2),
"pressure_hpa": round(sensor.data.pressure, 2),
"gas_resistance_ohm": sensor.data.gas_resistance if sensor.data.heat_stable else None,
}
except Exception as e:
log.debug("BME680 read error: %s", e)
return {}
# ── PMS5003 ───────────────────────────────────────────────────────────────────
# Protocol: 32-byte frames, start bytes 0x42 0x4d
PMS_START = b'\x42\x4d'
PMS_FRAME = 32
def init_pms5003():
try:
ser = serial.Serial(PMS_PORT, baudrate=9600, timeout=2)
log.info("PMS5003 on %s", PMS_PORT)
return ser
except Exception as e:
log.error("PMS5003 init failed: %s", e)
return None
def read_pms5003(ser):
if not ser:
return {}
try:
# Sync to frame start
while True:
b = ser.read(1)
if b == b'\x42':
b2 = ser.read(1)
if b2 == b'\x4d':
break
frame = ser.read(PMS_FRAME - 2)
if len(frame) < PMS_FRAME - 2:
return {}
# Parse: skip length (2 bytes), then 13 uint16s
vals = struct.unpack('>13H', frame[2:28])
return {
"pm1_0_ug_m3": vals[3], # atmospheric env
"pm2_5_ug_m3": vals[4],
"pm10_ug_m3": vals[5],
"particles_0_3_per_dl": vals[6],
"particles_0_5_per_dl": vals[7],
"particles_1_0_per_dl": vals[8],
"particles_2_5_per_dl": vals[9],
}
except Exception as e:
log.debug("PMS5003 read error: %s", e)
return {}
# ── GPS from Bee ──────────────────────────────────────────────────────────────
_gps_cache = {}
def fetch_gps():
global _gps_cache
try:
r = requests.get(f"{BEE_URL}/api/1/gnssConcise/latestValid", timeout=3)
if r.ok:
d = r.json()
_gps_cache = {
"lat": d.get("lat") or d.get("latitude"),
"lon": d.get("lon") or d.get("longitude"),
"alt": d.get("alt") or d.get("altitude"),
"gps_fix": True,
}
except Exception as e:
log.debug("GPS fetch failed: %s", e)
_gps_cache["gps_fix"] = False
return _gps_cache
def gps_poller():
while True:
fetch_gps()
time.sleep(1)
# ── Buffer ────────────────────────────────────────────────────────────────────
def buffer_reading(reading):
buf = []
try:
with open(BUFFER_FILE) as f:
buf = json.load(f)
except: pass
buf.append(reading)
# Cap buffer at 1000 readings (~3 hours at 10s interval)
if len(buf) > 1000:
buf = buf[-1000:]
with open(BUFFER_FILE, 'w') as f:
json.dump(buf, f)
def flush_buffer():
try:
with open(BUFFER_FILE) as f:
buf = json.load(f)
except: return
if not buf: return
try:
r = requests.post(
f"{ADAMAPS_URL}/api/ingest/air",
json={"device_id": DEVICE_ID, "readings": buf},
headers={"X-AdaMaps-Key": ADAMAPS_KEY},
timeout=15
)
if r.ok:
log.info("Flushed %d buffered readings", len(buf))
with open(BUFFER_FILE, 'w') as f:
json.dump([], f)
else:
log.warning("Flush failed: %s", r.status_code)
except Exception as e:
log.warning("Flush error: %s", e)
# ── Main loop ─────────────────────────────────────────────────────────────────
def main():
bme = init_bme680()
pms = init_pms5003()
# GPS in background thread
t = threading.Thread(target=gps_poller, daemon=True)
t.start()
log.info("Air aggregator started — sending every %ds", SEND_INTERVAL)
last_send = 0
while True:
bme_data = read_bme680(bme)
pms_data = read_pms5003(pms)
gps = dict(_gps_cache)
reading = {
"device_id": DEVICE_ID,
"sampled_at": datetime.now(timezone.utc).isoformat(),
"lat": gps.get("lat"),
"lon": gps.get("lon"),
"alt": gps.get("alt"),
"gps_fix": gps.get("gps_fix", False),
**bme_data,
**pms_data,
}
now = time.time()
if now - last_send >= SEND_INTERVAL:
# Try live send first, buffer on failure
try:
flush_buffer() # drain any backlog first
r = requests.post(
f"{ADAMAPS_URL}/api/ingest/air",
json={"device_id": DEVICE_ID, "readings": [reading]},
headers={"X-AdaMaps-Key": ADAMAPS_KEY},
timeout=10
)
if r.ok:
log.info("Sent | PM2.5=%.1f lat=%s lon=%s",
reading.get("pm2_5_ug_m3", 0),
reading.get("lat"), reading.get("lon"))
else:
log.warning("Send failed %s — buffering", r.status_code)
buffer_reading(reading)
except Exception as e:
log.warning("Offline (%s) — buffering", e)
buffer_reading(reading)
last_send = now
time.sleep(1)
if __name__ == "__main__":
main()