import gradio as gr import requests import pandas as pd import folium from folium.plugins import MarkerCluster import tempfile import os import json import time from concurrent.futures import ThreadPoolExecutor, as_completed # Get API credentials from environment variables EPA_AQS_API_BASE_URL = "https://aqs.epa.gov/data/api" EMAIL = os.environ.get("EPA_AQS_EMAIL", "") # Get from environment variable API_KEY = os.environ.get("EPA_AQS_API_KEY", "") # Get from environment variable class AirQualityApp: def __init__(self): self.states = { "AL": "Alabama", "AK": "Alaska", "AZ": "Arizona", "AR": "Arkansas", "CA": "California", "CO": "Colorado", "CT": "Connecticut", "DE": "Delaware", "FL": "Florida", "GA": "Georgia", "HI": "Hawaii", "ID": "Idaho", "IL": "Illinois", "IN": "Indiana", "IA": "Iowa", "KS": "Kansas", "KY": "Kentucky", "LA": "Louisiana", "ME": "Maine", "MD": "Maryland", "MA": "Massachusetts", "MI": "Michigan", "MN": "Minnesota", "MS": "Mississippi", "MO": "Missouri", "MT": "Montana", "NE": "Nebraska", "NV": "Nevada", "NH": "New Hampshire", "NJ": "New Jersey", "NM": "New Mexico", "NY": "New York", "NC": "North Carolina", "ND": "North Dakota", "OH": "Ohio", "OK": "Oklahoma", "OR": "Oregon", "PA": "Pennsylvania", "RI": "Rhode Island", "SC": "South Carolina", "SD": "South Dakota", "TN": "Tennessee", "TX": "Texas", "UT": "Utah", "VT": "Vermont", "VA": "Virginia", "WA": "Washington", "WV": "West Virginia", "WI": "Wisconsin", "WY": "Wyoming", "DC": "District of Columbia" } # Mapping from two-letter state codes to numeric state codes for API self.state_code_mapping = { "AL": "01", "AK": "02", "AZ": "04", "AR": "05", "CA": "06", "CO": "08", "CT": "09", "DE": "10", "FL": "12", "GA": "13", "HI": "15", "ID": "16", "IL": "17", "IN": "18", "IA": "19", "KS": "20", "KY": "21", "LA": "22", "ME": "23", "MD": "24", "MA": "25", "MI": "26", "MN": "27", "MS": "28", "MO": "29", "MT": "30", "NE": "31", "NV": "32", "NH": "33", "NJ": "34", "NM": "35", "NY": "36", "NC": "37", "ND": "38", "OH": "39", "OK": "40", "OR": "41", "PA": "42", "RI": "44", "SC": "45", "SD": "46", "TN": "47", "TX": "48", "UT": "49", "VT": "50", "VA": "51", "WA": "53", "WV": "54", "WI": "55", "WY": "56", "DC": "11" } # AQI categories with their corresponding colors - using only valid Folium icon colors self.aqi_categories = { "Good": "green", "Moderate": "orange", "Unhealthy for Sensitive Groups": "orange", "Unhealthy": "red", "Very Unhealthy": "purple", "Hazardous": "darkred" } # Color mapping for the legend (using original colors for display) self.aqi_legend_colors = { "Good": "#00e400", # Green "Moderate": "#ffff00", # Yellow "Unhealthy for Sensitive Groups": "#ff7e00", # Orange "Unhealthy": "#ff0000", # Red "Very Unhealthy": "#99004c", # Purple "Hazardous": "#7e0023" # Maroon } # Cache for storing monitored data self.all_monitors_cache = {} self.all_aqi_data_cache = {} # Load data on initialization print("Initializing and loading all monitors data...") self.load_all_monitors() print("Loading AQI data...") self.load_all_aqi_data() print("Initialization complete.") def load_all_monitors(self): """Load monitors data for all states""" # If we don't have API credentials, use mock data if not EMAIL or not API_KEY: for state_code in self.states.keys(): self.all_monitors_cache[state_code] = self.mock_get_monitors(state_code) return # With API credentials, load data for all states using multithreading with ThreadPoolExecutor(max_workers=5) as executor: future_to_state = {executor.submit(self.get_monitors, state_code): state_code for state_code in self.states.keys()} for future in as_completed(future_to_state): state_code = future_to_state[future] try: result = future.result() self.all_monitors_cache[state_code] = result print(f"Loaded {len(result)} monitors for {state_code}") except Exception as e: print(f"Error loading monitors for {state_code}: {e}") # Fall back to mock data self.all_monitors_cache[state_code] = self.mock_get_monitors(state_code) # Sleep briefly to avoid overwhelming the API time.sleep(0.5) def load_all_aqi_data(self): """Load AQI data for all states""" # If we don't have API credentials, use mock data if not EMAIL or not API_KEY: for state_code in self.states.keys(): self.all_aqi_data_cache[state_code] = self._generate_mock_aqi_data(state_code) return # With API credentials, load data for all states using multithreading with ThreadPoolExecutor(max_workers=5) as executor: future_to_state = {executor.submit(self.get_latest_aqi, state_code): state_code for state_code in self.states.keys()} for future in as_completed(future_to_state): state_code = future_to_state[future] try: result = future.result() self.all_aqi_data_cache[state_code] = result print(f"Loaded {len(result)} AQI readings for {state_code}") except Exception as e: print(f"Error loading AQI data for {state_code}: {e}") # Fall back to mock data self.all_aqi_data_cache[state_code] = self._generate_mock_aqi_data(state_code) # Sleep briefly to avoid overwhelming the API time.sleep(0.5) def _generate_mock_aqi_data(self, state_code): """Generate mock AQI data for a state""" import random from datetime import datetime, timedelta aqi_data = [] # Get numeric state code numeric_state_code = self.state_code_mapping.get(state_code, "01") # Make mock data for our standard states if state_code in ["CA", "NY", "TX"]: # Generate data for the most recent 7 days for days_ago in range(7): # Generate date date = (datetime.now() - timedelta(days=days_ago)).strftime("%Y-%m-%d") # Get monitors for this state from cache monitors = self.all_monitors_cache.get(state_code, self.mock_get_monitors(state_code)) # Generate AQI data for each monitor for monitor in monitors: county_code = monitor.get("county_code", "001") site_number = monitor.get("site_number", "0001") parameter_code = monitor.get("parameter_code", "88101") parameter_name = monitor.get("parameter_name", "PM2.5 - Local Conditions") # Generate random AQI value (between 0 and 300) aqi_value = random.randint(0, 300) aqi_data.append({ "state_code": numeric_state_code, "county_code": county_code, "site_number": site_number, "parameter_code": parameter_code, "parameter_name": parameter_name, "date_local": date, "aqi": aqi_value }) else: # For other states, generate minimal data # Current date date = datetime.now().strftime("%Y-%m-%d") # Make 2 fake monitors with random AQI values aqi_data.append({ "state_code": numeric_state_code, "county_code": "001", "site_number": "0001", "parameter_code": "88101", "parameter_name": "PM2.5 - Local Conditions", "date_local": date, "aqi": random.randint(0, 300) }) aqi_data.append({ "state_code": numeric_state_code, "county_code": "001", "site_number": "0002", "parameter_code": "44201", "parameter_name": "Ozone", "date_local": date, "aqi": random.randint(0, 300) }) return aqi_data def get_monitors(self, state_code, county_code=None, parameter_code=None): """Fetch monitoring stations for a given state and optional county""" # Check cache first if state_code in self.all_monitors_cache: monitors = self.all_monitors_cache[state_code] # Filter by county if provided if county_code: monitors = [m for m in monitors if m.get("county_code") == county_code] # Filter by parameter if provided if parameter_code: monitors = [m for m in monitors if m.get("parameter_code") == parameter_code] return monitors # If not in cache, fetch from API # If we don't have API credentials, use mock data if not EMAIL or not API_KEY: return self.mock_get_monitors(state_code, county_code, parameter_code) # Convert state code to numeric format for API api_state_code = state_code if len(state_code) == 2 and state_code in self.state_code_mapping: api_state_code = self.state_code_mapping[state_code] # API endpoint for monitoring sites endpoint = f"{EPA_AQS_API_BASE_URL}/monitors/byState" params = { "email": EMAIL, "key": API_KEY, "state": api_state_code, "bdate": "20240101", # Beginning date (YYYYMMDD) "edate": "20240414", # End date (YYYYMMDD) } if county_code: params["county"] = county_code if parameter_code: params["param"] = parameter_code try: response = requests.get(endpoint, params=params) data = response.json() # Handle the specific response structure if isinstance(data, dict): if "Data" in data and isinstance(data["Data"], list): return data["Data"] elif "Header" in data and isinstance(data["Header"], list): if len(data["Header"]) > 0 and data["Header"][0].get("status") == "Success": return data.get("Data", []) else: print(f"Header does not contain success status: {data['Header']}") # Special case - return mock data if we can't parse the API response print(f"Using mock data instead of API response for state {state_code}") return self.mock_get_monitors(state_code, county_code, parameter_code) else: print(f"Unexpected response format for monitors: {type(data)}") return self.mock_get_monitors(state_code, county_code, parameter_code) except Exception as e: print(f"Error fetching monitors: {e}") return self.mock_get_monitors(state_code, county_code, parameter_code) def get_counties(self, state_code): """Fetch counties for a given state""" # If we don't have API credentials, use mock data if not EMAIL or not API_KEY: return self.mock_get_counties(state_code) # Convert state code to numeric format for API api_state_code = state_code if len(state_code) == 2 and state_code in self.state_code_mapping: api_state_code = self.state_code_mapping[state_code] endpoint = f"{EPA_AQS_API_BASE_URL}/list/countiesByState" params = { "email": EMAIL, "key": API_KEY, "state": api_state_code } try: response = requests.get(endpoint, params=params) data = response.json() # Handle the specific response structure we observed counties = [] if isinstance(data, dict) and "Data" in data and isinstance(data["Data"], list): counties = data["Data"] # Format as "code: name" for dropdown result = [] for c in counties: code = c.get("code") value = c.get("value_represented") if code and value: result.append(f"{code}: {value}") return result except Exception as e: print(f"Error fetching counties: {e}") return [] def get_parameters(self): """Fetch available parameter codes (pollutants)""" # If we don't have API credentials, use mock data if not EMAIL or not API_KEY: return self.mock_get_parameters() endpoint = f"{EPA_AQS_API_BASE_URL}/list/parametersByClass" params = { "email": EMAIL, "key": API_KEY, "pc": "CRITERIA" # Filter to criteria pollutants } try: response = requests.get(endpoint, params=params) data = response.json() # Handle the specific response structure we observed parameters = [] if isinstance(data, dict) and "Data" in data and isinstance(data["Data"], list): parameters = data["Data"] # Format as "code: name" for dropdown result = [] for p in parameters: code = p.get("code") value = p.get("value_represented") if not code: code = p.get("parameter_code") if not value: value = p.get("parameter_name") if code and value: result.append(f"{code}: {value}") return result except Exception as e: print(f"Error fetching parameters: {e}") return [] def get_latest_aqi(self, state_code, county_code=None, parameter_code=None): """Fetch the latest AQI data for monitors""" # Check cache first if state_code in self.all_aqi_data_cache: aqi_data = self.all_aqi_data_cache[state_code] # Filter by county if provided if county_code: aqi_data = [item for item in aqi_data if item.get('county_code') == county_code] # Filter by parameter if provided if parameter_code: aqi_data = [item for item in aqi_data if item.get('parameter_code') == parameter_code] return aqi_data # If not in cache, fetch from API # If we don't have API credentials, use mock data if not EMAIL or not API_KEY: return self._generate_mock_aqi_data(state_code) # Convert state code to numeric format for API api_state_code = state_code if len(state_code) == 2 and state_code in self.state_code_mapping: api_state_code = self.state_code_mapping[state_code] endpoint = f"{EPA_AQS_API_BASE_URL}/dailyData/byState" params = { "email": EMAIL, "key": API_KEY, "state": api_state_code, "bdate": "20240314", # Beginning date (YYYYMMDD) - last 30 days "edate": "20240414", # End date (YYYYMMDD) - current date } # The county parameter might not be supported here either # We'll filter results by county after getting them if parameter_code: params["param"] = parameter_code try: response = requests.get(endpoint, params=params) data = response.json() # Handle the specific response structure we observed aqi_data = [] if isinstance(data, dict) and "Data" in data and isinstance(data["Data"], list): aqi_data = data["Data"] # Filter by county if provided if county_code and aqi_data: aqi_data = [item for item in aqi_data if item.get('county_code') == county_code] return aqi_data except Exception as e: print(f"Error fetching AQI data: {e}") return [] def create_map(self, focus_state=None, county_code=None, parameter_code=None): """Create a map with air quality monitoring stations for all states""" # Get all monitors - either focused on a state or all states all_monitors = [] if focus_state: # Get monitors just for the focused state monitors = self.get_monitors(focus_state, county_code, parameter_code) if monitors: all_monitors.extend(monitors) else: # Get all monitors from all states for state_code in self.states.keys(): monitors = self.get_monitors(state_code) if monitors: all_monitors.extend(monitors) if not all_monitors: return {"map": "No monitoring stations found for the selected criteria.", "legend": "", "data": None} # Convert to DataFrame for easier manipulation df = pd.DataFrame(all_monitors) # Create a map centered on the continental US if focus_state: # Center on the focused state center_lat = df["latitude"].mean() center_lon = df["longitude"].mean() zoom_start = 7 else: # Center on continental US center_lat = 39.8283 center_lon = -98.5795 zoom_start = 4 # Create a map with a specific width and height m = folium.Map(location=[center_lat, center_lon], zoom_start=zoom_start, width='100%', height=700) # Add a marker cluster marker_cluster = MarkerCluster().add_to(m) # Get all AQI data all_aqi_data = [] aqi_data_by_site = {} # Process AQI data for each state for state_code in self.states.keys(): # Skip states we don't need if focusing on a specific state if focus_state and state_code != focus_state: continue # Get AQI data for this state state_aqi_data = self.get_latest_aqi(state_code, county_code, parameter_code) if state_aqi_data: all_aqi_data.extend(state_aqi_data) # Create a lookup dictionary by site ID for item in state_aqi_data: site_id = f"{item['state_code']}-{item['county_code']}-{item['site_number']}" if site_id not in aqi_data_by_site: aqi_data_by_site[site_id] = [] aqi_data_by_site[site_id].append(item) # Add markers for each monitoring station for _, row in df.iterrows(): site_id = f"{row['state_code']}-{row['county_code']}-{row['site_number']}" # Default marker color is blue color = "blue" # Get AQI data for this station if available station_aqi_data = aqi_data_by_site.get(site_id, []) latest_aqi = None aqi_category = None # Create a table of pollutant readings if available aqi_readings_html = "" if station_aqi_data: # Sort by date (most recent first) station_aqi_data.sort(key=lambda x: x.get('date_local', ''), reverse=True) # Get latest AQI for marker color if station_aqi_data[0].get('aqi'): latest_aqi = station_aqi_data[0].get('aqi') aqi_category = self.get_aqi_category(latest_aqi) color = self.aqi_categories.get(aqi_category, "blue") # Create a table of readings aqi_readings_html = """
Date | Pollutant | AQI | Category |
---|---|---|---|
{date} | {pollutant} | {aqi_value} | {category} |
Showing 10 of {len(station_aqi_data)} readings
" # Create popup content with detailed information popup_content = f"""Site ID: {site_id}
Address: {row.get('address', 'N/A')}
City: {row.get('city_name', 'N/A')}
County: {row.get('county_name', 'N/A')}
State: {row.get('state_name', self.states.get(row.get('state_code', ''), 'Unknown'))}
Parameter: {row.get('parameter_name', 'N/A')}
Coordinates: {row.get('latitude', 'N/A')}, {row.get('longitude', 'N/A')}
{aqi_readings_html}No air quality data available for the selected criteria.
" # Filter by state if provided if state_filter: # Convert state code if needed if len(state_filter) == 2: state_filter = self.state_code_mapping.get(state_filter, state_filter) aqi_data = [item for item in aqi_data if item.get('state_code') == state_filter] # Filter by county if provided if county_filter: aqi_data = [item for item in aqi_data if item.get('county_code') == county_filter] if not aqi_data or len(aqi_data) == 0: return "No air quality data available for the selected criteria.
" # Sort by date (most recent first) and then by AQI value (highest first) sorted_data = sorted(aqi_data, key=lambda x: (x.get('date_local', ''), -int(x.get('aqi', 0)) if x.get('aqi') and str(x.get('aqi')).isdigit() else 0), reverse=True) # Group by location to show the latest readings for each site site_data = {} for item in sorted_data: site_id = f"{item.get('state_code', '')}-{item.get('county_code', '')}-{item.get('site_number', '')}" param = item.get('parameter_code', '') key = f"{site_id}-{param}" if key not in site_data: site_data[key] = item # Create HTML table html = """Date | State | County | Location | Pollutant | AQI | Category |
---|---|---|---|---|---|---|
{date} | {state_name} | {county_code} | {location} | {pollutant} | {aqi_value} | {category} |
No air quality data available for the selected criteria.
" return map_html, data_html else: # Return error message or whatever was returned error_message = result if isinstance(result, str) else "An error occurred" return error_message, "No data available
" # Create the UI with gr.Blocks(title="Air Quality Monitoring Stations") as interface: gr.Markdown("# NOAA Air Quality Monitoring Stations Map") gr.Markdown(""" This application displays air quality monitoring stations across the United States and shows current air quality readings. **Note:** To use the actual EPA AQS API, you need to register for an API key and set `EPA_AQS_EMAIL` and `EPA_AQS_API_KEY` environment variables in your Hugging Face Space. For demonstration without an API key, the app shows sample data with more detailed information for California (CA), New York (NY), and Texas (TX). """) with gr.Row(): with gr.Column(scale=1): # State dropdown with empty default (all states) state_dropdown = gr.Dropdown( choices=[""] + list(app.states.keys()), label="Filter by State (Optional)", value="" ) # County dropdown (initially empty) county_dropdown = gr.Dropdown( choices=[], label="Filter by County (Optional)", allow_custom_value=True ) # Parameter dropdown (pollutant type) parameter_dropdown = gr.Dropdown( choices=app.mock_get_parameters(), label="Filter by Pollutant (Optional)", allow_custom_value=True ) # Button to update filters map_button = gr.Button("Update Filters") # Create two tabs for the map and data with gr.Tabs() as tabs: with gr.TabItem("Map"): # HTML component to display the map map_html = gr.HTML(label="Air Quality Monitoring Stations Map") with gr.TabItem("Air Quality Data"): # HTML component to display the air quality data data_html = gr.HTML(label="Air Quality Readings") # Set up event handlers state_dropdown.change( fn=update_counties, inputs=state_dropdown, outputs=county_dropdown ) map_button.click( fn=show_map_and_data, inputs=[state_dropdown, county_dropdown, parameter_dropdown], outputs=[map_html, data_html] ) # Load initial map when the app starts interface.load( fn=show_map_and_data, inputs=None, outputs=[map_html, data_html] ) return interface # Create and launch the app if __name__ == "__main__": air_quality_map_ui = create_air_quality_map_ui() air_quality_map_ui.launch()