Previous values (adamaps-ingest-2026, adamaps-read-2026, mapnet-ingest-2026) were inline defaults across adamaps + adacam-api + varroa. The ingest key was briefly anon-visible during the 2026-05-27 Forgejo public-flip when adacam-api + varroa were public for a short window before the leak was spotted. New values live in Vaultwarden: - AdaMaps — API_KEY (ingest) - AdaMaps — READ_KEY Validators now hard-fail at boot if the env var is missing. Service is on hold today; when it resumes, both env vars must be set.
235 lines
8.4 KiB
Python
235 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
air-aggregator.py — Blackbox Air Quality Service
|
|
Reads BME680 (I2C) + PMS5003 (UART), fuses with GPS from Bee API,
|
|
ships readings to AdaMaps ingest endpoint.
|
|
|
|
Hardware:
|
|
BME680 → I2C (SDA=GPIO2, SCL=GPIO3, addr=0x77)
|
|
PMS5003 → UART (/dev/serial0 or /dev/ttyS0, 9600 baud)
|
|
|
|
Dependencies:
|
|
pip install bme680 requests smbus2
|
|
PMS5003 is read manually (no dep needed)
|
|
|
|
Config via env vars or edit constants below.
|
|
"""
|
|
|
|
import os, time, json, struct, logging, threading
|
|
import serial
|
|
import smbus2
|
|
import bme680
|
|
import requests
|
|
from datetime import datetime, timezone
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
BEE_URL = os.environ.get("BEE_URL", "http://192.168.197.1:5000")
|
|
ADAMAPS_URL = os.environ.get("ADAMAPS_URL", "https://api.adamaps.org")
|
|
ADAMAPS_KEY = os.environ.get("ADAMAPS_KEY", "<your-adamaps-ingest-key>")
|
|
DEVICE_ID = os.environ.get("DEVICE_ID", "blackbox-pi")
|
|
PMS_PORT = os.environ.get("PMS_PORT", "/dev/ttyS0")
|
|
SEND_INTERVAL = int(os.environ.get("SEND_INTERVAL", "10")) # seconds
|
|
BUFFER_FILE = "/tmp/air_buffer.json"
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
logging.basicConfig(level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s")
|
|
log = logging.getLogger("air-aggregator")
|
|
|
|
|
|
# ── BME680 ────────────────────────────────────────────────────────────────────
|
|
def init_bme680():
|
|
try:
|
|
sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY)
|
|
sensor.set_humidity_oversample(bme680.OS_2X)
|
|
sensor.set_pressure_oversample(bme680.OS_4X)
|
|
sensor.set_temperature_oversample(bme680.OS_8X)
|
|
sensor.set_filter(bme680.FILTER_SIZE_3)
|
|
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS)
|
|
sensor.set_gas_heater_temperature(320)
|
|
sensor.set_gas_heater_duration(150)
|
|
sensor.select_gas_heater_profile(0)
|
|
log.info("BME680 initialized")
|
|
return sensor
|
|
except Exception as e:
|
|
log.error("BME680 init failed: %s", e)
|
|
return None
|
|
|
|
def read_bme680(sensor):
|
|
if not sensor:
|
|
return {}
|
|
try:
|
|
if sensor.get_sensor_data():
|
|
return {
|
|
"temperature_c": round(sensor.data.temperature, 2),
|
|
"humidity_pct": round(sensor.data.humidity, 2),
|
|
"pressure_hpa": round(sensor.data.pressure, 2),
|
|
"gas_resistance_ohm": sensor.data.gas_resistance if sensor.data.heat_stable else None,
|
|
}
|
|
except Exception as e:
|
|
log.debug("BME680 read error: %s", e)
|
|
return {}
|
|
|
|
|
|
# ── PMS5003 ───────────────────────────────────────────────────────────────────
|
|
# Protocol: 32-byte frames, start bytes 0x42 0x4d
|
|
PMS_START = b'\x42\x4d'
|
|
PMS_FRAME = 32
|
|
|
|
def init_pms5003():
|
|
try:
|
|
ser = serial.Serial(PMS_PORT, baudrate=9600, timeout=2)
|
|
log.info("PMS5003 on %s", PMS_PORT)
|
|
return ser
|
|
except Exception as e:
|
|
log.error("PMS5003 init failed: %s", e)
|
|
return None
|
|
|
|
def read_pms5003(ser):
|
|
if not ser:
|
|
return {}
|
|
try:
|
|
# Sync to frame start
|
|
while True:
|
|
b = ser.read(1)
|
|
if b == b'\x42':
|
|
b2 = ser.read(1)
|
|
if b2 == b'\x4d':
|
|
break
|
|
frame = ser.read(PMS_FRAME - 2)
|
|
if len(frame) < PMS_FRAME - 2:
|
|
return {}
|
|
# Parse: skip length (2 bytes), then 13 uint16s
|
|
vals = struct.unpack('>13H', frame[2:28])
|
|
return {
|
|
"pm1_0_ug_m3": vals[3], # atmospheric env
|
|
"pm2_5_ug_m3": vals[4],
|
|
"pm10_ug_m3": vals[5],
|
|
"particles_0_3_per_dl": vals[6],
|
|
"particles_0_5_per_dl": vals[7],
|
|
"particles_1_0_per_dl": vals[8],
|
|
"particles_2_5_per_dl": vals[9],
|
|
}
|
|
except Exception as e:
|
|
log.debug("PMS5003 read error: %s", e)
|
|
return {}
|
|
|
|
|
|
# ── GPS from Bee ──────────────────────────────────────────────────────────────
|
|
_gps_cache = {}
|
|
|
|
def fetch_gps():
|
|
global _gps_cache
|
|
try:
|
|
r = requests.get(f"{BEE_URL}/api/1/gnssConcise/latestValid", timeout=3)
|
|
if r.ok:
|
|
d = r.json()
|
|
_gps_cache = {
|
|
"lat": d.get("lat") or d.get("latitude"),
|
|
"lon": d.get("lon") or d.get("longitude"),
|
|
"alt": d.get("alt") or d.get("altitude"),
|
|
"gps_fix": True,
|
|
}
|
|
except Exception as e:
|
|
log.debug("GPS fetch failed: %s", e)
|
|
_gps_cache["gps_fix"] = False
|
|
return _gps_cache
|
|
|
|
def gps_poller():
|
|
while True:
|
|
fetch_gps()
|
|
time.sleep(1)
|
|
|
|
|
|
# ── Buffer ────────────────────────────────────────────────────────────────────
|
|
def buffer_reading(reading):
|
|
buf = []
|
|
try:
|
|
with open(BUFFER_FILE) as f:
|
|
buf = json.load(f)
|
|
except: pass
|
|
buf.append(reading)
|
|
# Cap buffer at 1000 readings (~3 hours at 10s interval)
|
|
if len(buf) > 1000:
|
|
buf = buf[-1000:]
|
|
with open(BUFFER_FILE, 'w') as f:
|
|
json.dump(buf, f)
|
|
|
|
def flush_buffer():
|
|
try:
|
|
with open(BUFFER_FILE) as f:
|
|
buf = json.load(f)
|
|
except: return
|
|
if not buf: return
|
|
try:
|
|
r = requests.post(
|
|
f"{ADAMAPS_URL}/api/ingest/air",
|
|
json={"device_id": DEVICE_ID, "readings": buf},
|
|
headers={"X-AdaMaps-Key": ADAMAPS_KEY},
|
|
timeout=15
|
|
)
|
|
if r.ok:
|
|
log.info("Flushed %d buffered readings", len(buf))
|
|
with open(BUFFER_FILE, 'w') as f:
|
|
json.dump([], f)
|
|
else:
|
|
log.warning("Flush failed: %s", r.status_code)
|
|
except Exception as e:
|
|
log.warning("Flush error: %s", e)
|
|
|
|
|
|
# ── Main loop ─────────────────────────────────────────────────────────────────
|
|
def main():
|
|
bme = init_bme680()
|
|
pms = init_pms5003()
|
|
|
|
# GPS in background thread
|
|
t = threading.Thread(target=gps_poller, daemon=True)
|
|
t.start()
|
|
|
|
log.info("Air aggregator started — sending every %ds", SEND_INTERVAL)
|
|
last_send = 0
|
|
|
|
while True:
|
|
bme_data = read_bme680(bme)
|
|
pms_data = read_pms5003(pms)
|
|
gps = dict(_gps_cache)
|
|
|
|
reading = {
|
|
"device_id": DEVICE_ID,
|
|
"sampled_at": datetime.now(timezone.utc).isoformat(),
|
|
"lat": gps.get("lat"),
|
|
"lon": gps.get("lon"),
|
|
"alt": gps.get("alt"),
|
|
"gps_fix": gps.get("gps_fix", False),
|
|
**bme_data,
|
|
**pms_data,
|
|
}
|
|
|
|
now = time.time()
|
|
if now - last_send >= SEND_INTERVAL:
|
|
# Try live send first, buffer on failure
|
|
try:
|
|
flush_buffer() # drain any backlog first
|
|
r = requests.post(
|
|
f"{ADAMAPS_URL}/api/ingest/air",
|
|
json={"device_id": DEVICE_ID, "readings": [reading]},
|
|
headers={"X-AdaMaps-Key": ADAMAPS_KEY},
|
|
timeout=10
|
|
)
|
|
if r.ok:
|
|
log.info("Sent | PM2.5=%.1f lat=%s lon=%s",
|
|
reading.get("pm2_5_ug_m3", 0),
|
|
reading.get("lat"), reading.get("lon"))
|
|
else:
|
|
log.warning("Send failed %s — buffering", r.status_code)
|
|
buffer_reading(reading)
|
|
except Exception as e:
|
|
log.warning("Offline (%s) — buffering", e)
|
|
buffer_reading(reading)
|
|
last_send = now
|
|
|
|
time.sleep(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|