#!/usr/bin/env python3 """ AIR QUALITY API — Routes to add to /app/app.py on Rackham (adamaps-api container) Review this file, then apply via: docker exec adamaps-api python3 /tmp/apply_air_patch.py DO NOT apply until Cobb approves. """ # ─── EPA AQI calculation ────────────────────────────────────────────────────── def pm25_to_aqi(pm25): if pm25 is None: return None breakpoints = [ (0.0, 12.0, 0, 50), (12.1, 35.4, 51, 100), (35.5, 55.4, 101, 150), (55.5, 150.4, 151, 200), (150.5, 250.4, 201, 300), (250.5, 350.4, 301, 400), (350.5, 500.4, 401, 500), ] for c_lo, c_hi, i_lo, i_hi in breakpoints: if c_lo <= pm25 <= c_hi: return round((i_hi - i_lo) / (c_hi - c_lo) * (pm25 - c_lo) + i_lo) return 500 if pm25 > 500 else 0 def init_air_table(): try: conn = get_db() cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS air_quality ( id SERIAL PRIMARY KEY, device_id VARCHAR(64), sampled_at TIMESTAMP, lat DOUBLE PRECISION, lon DOUBLE PRECISION, alt DOUBLE PRECISION, gps_fix BOOLEAN DEFAULT FALSE, pm1_0 FLOAT, pm2_5 FLOAT, pm10 FLOAT, temperature_c FLOAT, humidity_pct FLOAT, pressure_hpa FLOAT, gas_resistance_ohm FLOAT, aqi INTEGER, created_at TIMESTAMP DEFAULT NOW() ) """) cur.execute("CREATE INDEX IF NOT EXISTS air_latlon_idx ON air_quality (lat, lon)") cur.execute("CREATE INDEX IF NOT EXISTS air_sampled_idx ON air_quality (sampled_at DESC)") conn.commit(); cur.close(); conn.close() except Exception as e: print(f"air table init: {e}") # ─── /api/ingest/air ────────────────────────────────────────────────────────── # POST — auth required (X-AdaMaps-Key) # Body: {"device_id": "blackbox-pi", "readings": [{sampled_at, lat, lon, pm2_5_ug_m3, ...}]} # Returns: {"inserted": N} def ingest_air(): if request.headers.get("X-AdaMaps-Key") != API_KEY: return jsonify({"error": "unauthorized"}), 401 data = request.json if not data: return jsonify({"error": "invalid"}), 400 device_id = data.get("device_id", "unknown") readings = data.get("readings", [data] if "sampled_at" in data else []) if not readings: return jsonify({"error": "no readings"}), 400 init_air_table() inserted = 0 try: conn = get_db(); cur = conn.cursor() for r in readings: try: pm25 = r.get("pm2_5_ug_m3") or r.get("pm2_5") cur.execute(""" INSERT INTO air_quality (device_id, sampled_at, lat, lon, alt, gps_fix, pm1_0, pm2_5, pm10, temperature_c, humidity_pct, pressure_hpa, gas_resistance_ohm, aqi) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """, ( r.get("device_id", device_id), r.get("sampled_at"), r.get("lat"), r.get("lon"), r.get("alt"), r.get("gps_fix", False), r.get("pm1_0_ug_m3"), pm25, r.get("pm10_ug_m3"), r.get("temperature_c"), r.get("humidity_pct"), r.get("pressure_hpa"), r.get("gas_resistance_ohm"), pm25_to_aqi(pm25) )) inserted += 1 except: conn.rollback() conn.commit(); cur.close(); conn.close() except Exception as e: return jsonify({"error": "db_unavailable", "detail": str(e)}), 503 return jsonify({"inserted": inserted, "device_id": device_id}) # ─── /api/air/heatmap ───────────────────────────────────────────────────────── # GET ?metric=aqi|pm2_5 &hours=24 # Returns [[lat, lon, intensity_0_to_1], ...] for Leaflet.heat def air_heatmap(): metric = request.args.get("metric", "aqi") hours = request.args.get("hours", 24, type=int) col = "aqi" if metric == "aqi" else "pm2_5" max_val = 300.0 if metric == "aqi" else 150.0 try: conn = get_db(); cur = conn.cursor() cur.execute(f""" SELECT lat, lon, {col} FROM air_quality WHERE sampled_at > NOW() - INTERVAL '%s hours' AND lat IS NOT NULL AND lon IS NOT NULL AND {col} IS NOT NULL AND gps_fix = TRUE ORDER BY sampled_at DESC LIMIT 50000 """, (hours,)) rows = [[float(r[0]), float(r[1]), min(float(r[2]) / max_val, 1.0)] for r in cur.fetchall()] cur.close(); conn.close() return jsonify(rows) except Exception as e: return jsonify({"error": str(e)}), 500 # ─── /api/air/latest ────────────────────────────────────────────────────────── # GET — most recent reading per device def air_latest(): try: conn = get_db(); cur = conn.cursor() cur.execute(""" SELECT DISTINCT ON (device_id) device_id, sampled_at, lat, lon, pm1_0, pm2_5, pm10, temperature_c, humidity_pct, aqi FROM air_quality WHERE sampled_at > NOW() - INTERVAL '1 hour' ORDER BY device_id, sampled_at DESC """) rows = [{"device_id": r[0], "sampled_at": r[1].isoformat() if r[1] else None, "lat": float(r[2]) if r[2] else None, "lon": float(r[3]) if r[3] else None, "pm1_0": r[4], "pm2_5": r[5], "pm10": r[6], "temperature_c": r[7], "humidity_pct": r[8], "aqi": r[9]} for r in cur.fetchall()] cur.close(); conn.close() return jsonify(rows) except Exception as e: return jsonify({"error": str(e)}), 500 # ─── Flask route registration (add to app after existing routes) ─────────────── # app.add_url_rule("/api/ingest/air", "ingest_air", ingest_air, methods=["POST"]) # app.add_url_rule("/api/air/heatmap", "air_heatmap", air_heatmap, methods=["GET"]) # app.add_url_rule("/api/air/latest", "air_latest", air_latest, methods=["GET"])