145 lines
6.7 KiB
Python
145 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AIR QUALITY API — Routes to add to /app/app.py on Rackham (adamaps-api container)
|
|
Review this file, then apply via: docker exec adamaps-api python3 /tmp/apply_air_patch.py
|
|
DO NOT apply until Cobb approves.
|
|
"""
|
|
|
|
# ─── EPA AQI calculation ──────────────────────────────────────────────────────
|
|
def pm25_to_aqi(pm25):
|
|
if pm25 is None: return None
|
|
breakpoints = [
|
|
(0.0, 12.0, 0, 50),
|
|
(12.1, 35.4, 51, 100),
|
|
(35.5, 55.4, 101, 150),
|
|
(55.5, 150.4, 151, 200),
|
|
(150.5, 250.4, 201, 300),
|
|
(250.5, 350.4, 301, 400),
|
|
(350.5, 500.4, 401, 500),
|
|
]
|
|
for c_lo, c_hi, i_lo, i_hi in breakpoints:
|
|
if c_lo <= pm25 <= c_hi:
|
|
return round((i_hi - i_lo) / (c_hi - c_lo) * (pm25 - c_lo) + i_lo)
|
|
return 500 if pm25 > 500 else 0
|
|
|
|
def init_air_table():
|
|
try:
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS air_quality (
|
|
id SERIAL PRIMARY KEY,
|
|
device_id VARCHAR(64),
|
|
sampled_at TIMESTAMP,
|
|
lat DOUBLE PRECISION,
|
|
lon DOUBLE PRECISION,
|
|
alt DOUBLE PRECISION,
|
|
gps_fix BOOLEAN DEFAULT FALSE,
|
|
pm1_0 FLOAT,
|
|
pm2_5 FLOAT,
|
|
pm10 FLOAT,
|
|
temperature_c FLOAT,
|
|
humidity_pct FLOAT,
|
|
pressure_hpa FLOAT,
|
|
gas_resistance_ohm FLOAT,
|
|
aqi INTEGER,
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
)
|
|
""")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS air_latlon_idx ON air_quality (lat, lon)")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS air_sampled_idx ON air_quality (sampled_at DESC)")
|
|
conn.commit(); cur.close(); conn.close()
|
|
except Exception as e:
|
|
print(f"air table init: {e}")
|
|
|
|
# ─── /api/ingest/air ──────────────────────────────────────────────────────────
|
|
# POST — auth required (X-AdaMaps-Key)
|
|
# Body: {"device_id": "blackbox-pi", "readings": [{sampled_at, lat, lon, pm2_5_ug_m3, ...}]}
|
|
# Returns: {"inserted": N}
|
|
def ingest_air():
|
|
if request.headers.get("X-AdaMaps-Key") != API_KEY:
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
data = request.json
|
|
if not data: return jsonify({"error": "invalid"}), 400
|
|
device_id = data.get("device_id", "unknown")
|
|
readings = data.get("readings", [data] if "sampled_at" in data else [])
|
|
if not readings: return jsonify({"error": "no readings"}), 400
|
|
init_air_table()
|
|
inserted = 0
|
|
try:
|
|
conn = get_db(); cur = conn.cursor()
|
|
for r in readings:
|
|
try:
|
|
pm25 = r.get("pm2_5_ug_m3") or r.get("pm2_5")
|
|
cur.execute("""
|
|
INSERT INTO air_quality
|
|
(device_id, sampled_at, lat, lon, alt, gps_fix,
|
|
pm1_0, pm2_5, pm10, temperature_c, humidity_pct,
|
|
pressure_hpa, gas_resistance_ohm, aqi)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
""", (
|
|
r.get("device_id", device_id), r.get("sampled_at"),
|
|
r.get("lat"), r.get("lon"), r.get("alt"), r.get("gps_fix", False),
|
|
r.get("pm1_0_ug_m3"), pm25, r.get("pm10_ug_m3"),
|
|
r.get("temperature_c"), r.get("humidity_pct"),
|
|
r.get("pressure_hpa"), r.get("gas_resistance_ohm"),
|
|
pm25_to_aqi(pm25)
|
|
))
|
|
inserted += 1
|
|
except: conn.rollback()
|
|
conn.commit(); cur.close(); conn.close()
|
|
except Exception as e:
|
|
return jsonify({"error": "db_unavailable", "detail": str(e)}), 503
|
|
return jsonify({"inserted": inserted, "device_id": device_id})
|
|
|
|
# ─── /api/air/heatmap ─────────────────────────────────────────────────────────
|
|
# GET ?metric=aqi|pm2_5 &hours=24
|
|
# Returns [[lat, lon, intensity_0_to_1], ...] for Leaflet.heat
|
|
def air_heatmap():
|
|
metric = request.args.get("metric", "aqi")
|
|
hours = request.args.get("hours", 24, type=int)
|
|
col = "aqi" if metric == "aqi" else "pm2_5"
|
|
max_val = 300.0 if metric == "aqi" else 150.0
|
|
try:
|
|
conn = get_db(); cur = conn.cursor()
|
|
cur.execute(f"""
|
|
SELECT lat, lon, {col} FROM air_quality
|
|
WHERE sampled_at > NOW() - INTERVAL '%s hours'
|
|
AND lat IS NOT NULL AND lon IS NOT NULL
|
|
AND {col} IS NOT NULL AND gps_fix = TRUE
|
|
ORDER BY sampled_at DESC LIMIT 50000
|
|
""", (hours,))
|
|
rows = [[float(r[0]), float(r[1]), min(float(r[2]) / max_val, 1.0)]
|
|
for r in cur.fetchall()]
|
|
cur.close(); conn.close()
|
|
return jsonify(rows)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
# ─── /api/air/latest ──────────────────────────────────────────────────────────
|
|
# GET — most recent reading per device
|
|
def air_latest():
|
|
try:
|
|
conn = get_db(); cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT DISTINCT ON (device_id)
|
|
device_id, sampled_at, lat, lon,
|
|
pm1_0, pm2_5, pm10, temperature_c, humidity_pct, aqi
|
|
FROM air_quality
|
|
WHERE sampled_at > NOW() - INTERVAL '1 hour'
|
|
ORDER BY device_id, sampled_at DESC
|
|
""")
|
|
rows = [{"device_id": r[0], "sampled_at": r[1].isoformat() if r[1] else None,
|
|
"lat": float(r[2]) if r[2] else None, "lon": float(r[3]) if r[3] else None,
|
|
"pm1_0": r[4], "pm2_5": r[5], "pm10": r[6],
|
|
"temperature_c": r[7], "humidity_pct": r[8], "aqi": r[9]}
|
|
for r in cur.fetchall()]
|
|
cur.close(); conn.close()
|
|
return jsonify(rows)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
# ─── Flask route registration (add to app after existing routes) ───────────────
|
|
# app.add_url_rule("/api/ingest/air", "ingest_air", ingest_air, methods=["POST"])
|
|
# app.add_url_rule("/api/air/heatmap", "air_heatmap", air_heatmap, methods=["GET"])
|
|
# app.add_url_rule("/api/air/latest", "air_latest", air_latest, methods=["GET"])
|