Spaces:
Sleeping
Sleeping
import gradio as gr | |
import folium | |
import requests | |
import json | |
from datetime import datetime | |
import time | |
import urllib.parse | |
class RainViewerMap: | |
def __init__(self): | |
self.api_url = "https://api.rainviewer.com/public/weather-maps.json" | |
self.tile_host = "https://tilecache.rainviewer.com" | |
self.nhc_api_base = "https://mapservices.weather.noaa.gov/tropical/rest/services/tropical/NHC_tropical_weather/MapServer" | |
self.storm_cache = {} | |
def get_radar_data(self): | |
"""Fetch available radar timestamps from RainViewer API""" | |
try: | |
response = requests.get(self.api_url) | |
data = response.json() | |
return data | |
except Exception as e: | |
print(f"Error fetching radar data: {e}") | |
return None | |
def get_hurricane_data(self): | |
"""Fetch active hurricane/tropical storm data from NOAA NHC""" | |
active_storms = [] | |
error_messages = [] | |
# Try multiple approaches to get hurricane data | |
try: | |
# Approach 1: Try the simplified hurricane data endpoint | |
simple_url = "https://www.nhc.noaa.gov/CurrentStorms.json" | |
try: | |
response = requests.get(simple_url, timeout=10) | |
if response.status_code == 200: | |
storms_data = response.json() | |
for storm in storms_data.get('activeStorms', []): | |
active_storms.append({ | |
'source': 'NHC CurrentStorms', | |
'data': storm, | |
'type': 'current_storms' | |
}) | |
except Exception as e: | |
error_messages.append(f"CurrentStorms.json failed: {e}") | |
# Approach 2: Try NOAA GIS layers for specific storm data | |
if not active_storms: | |
layers_url = f"{self.nhc_api_base}/layers?f=json" | |
try: | |
layers_response = requests.get(layers_url, timeout=15) | |
if layers_response.status_code == 200: | |
layers_data = layers_response.json() | |
# Look for Atlantic storms (AT1-AT5) forecast points | |
for layer in layers_data.get('layers', []): | |
layer_name = layer.get('name', '').lower() | |
layer_id = layer.get('id') | |
if ('at1' in layer_name or 'at2' in layer_name or 'at3' in layer_name) and 'forecast points' in layer_name: | |
query_url = f"{self.nhc_api_base}/{layer_id}/query" | |
params = { | |
'where': '1=1', | |
'outFields': '*', | |
'f': 'geojson', | |
'returnGeometry': 'true' | |
} | |
try: | |
query_response = requests.get(query_url, params=params, timeout=15) | |
if query_response.status_code == 200: | |
geojson_data = query_response.json() | |
if geojson_data.get('features'): | |
active_storms.append({ | |
'source': 'NOAA GIS', | |
'layer_name': layer.get('name', ''), | |
'layer_id': layer_id, | |
'geojson': geojson_data, | |
'type': 'gis_layer' | |
}) | |
except Exception as e: | |
error_messages.append(f"Layer {layer_id} query failed: {e}") | |
except Exception as e: | |
error_messages.append(f"Layers API failed: {e}") | |
# Approach 3: Add sample data if no real data available (for testing) | |
if not active_storms: | |
sample_storm = { | |
'source': 'Sample Data', | |
'type': 'sample', | |
'geojson': { | |
'type': 'FeatureCollection', | |
'features': [ | |
{ | |
'type': 'Feature', | |
'geometry': { | |
'type': 'Point', | |
'coordinates': [-80.0, 26.0] | |
}, | |
'properties': { | |
'STORMNAME': 'Sample Hurricane', | |
'STORMTYPE': 'Hurricane', | |
'MAXWIND': '75', | |
'MSLP': '980', | |
'FHOUR': 'Current' | |
} | |
} | |
] | |
} | |
} | |
active_storms.append(sample_storm) | |
error_messages.append("No real storm data available, showing sample data") | |
# Store error messages for debugging | |
self.storm_cache['errors'] = error_messages | |
self.storm_cache['last_update'] = datetime.now().isoformat() | |
return active_storms | |
except Exception as e: | |
error_messages.append(f"General error fetching hurricane data: {e}") | |
self.storm_cache['errors'] = error_messages | |
print(f"Hurricane data fetch error: {e}") | |
return [] | |
def get_storm_track_data(self, layer_id): | |
"""Fetch track data for a specific storm layer""" | |
try: | |
# Look for corresponding track layer (usually layer_id - 1 or nearby) | |
track_layer_id = layer_id - 1 if layer_id > 0 else layer_id + 1 | |
query_url = f"{self.nhc_api_base}/{track_layer_id}/query" | |
params = { | |
'where': '1=1', | |
'outFields': '*', | |
'f': 'geojson' | |
} | |
response = requests.get(query_url, params=params) | |
if response.status_code == 200: | |
return response.json() | |
return None | |
except Exception as e: | |
print(f"Error fetching track data: {e}") | |
return None | |
def create_map(self, lat=40.7128, lon=-74.0060, zoom=8, show_radar=True, time_index=0, show_hurricanes=True): | |
"""Create a Folium map with optional radar overlay and hurricane tracking""" | |
# Create base map | |
m = folium.Map( | |
location=[lat, lon], | |
zoom_start=zoom, | |
tiles='OpenStreetMap' | |
) | |
if show_radar: | |
radar_data = self.get_radar_data() | |
if radar_data and 'radar' in radar_data: | |
# Get available timestamps | |
past_data = radar_data['radar']['past'] | |
nowcast_data = radar_data['radar']['nowcast'] | |
all_times = past_data + nowcast_data | |
if all_times and time_index < len(all_times): | |
# Get the radar tile path for selected time | |
selected_time = all_times[time_index] | |
tile_path = selected_time['path'] | |
timestamp = selected_time['time'] | |
# Create tile URL template | |
tile_url = f"{self.tile_host}{tile_path}/512/{{z}}/{{x}}/{{y}}/2/1_1.png" | |
# Add radar overlay | |
folium.raster_layers.TileLayer( | |
tiles=tile_url, | |
attr='RainViewer', | |
name='Radar', | |
overlay=True, | |
control=True, | |
opacity=0.6 | |
).add_to(m) | |
# Add timestamp info | |
dt = datetime.fromtimestamp(timestamp) | |
folium.Marker( | |
[lat + 0.1, lon + 0.1], | |
popup=f"Radar Time: {dt.strftime('%Y-%m-%d %H:%M:%S')}", | |
icon=folium.Icon(color='blue', icon='info-sign') | |
).add_to(m) | |
# Add hurricane tracking | |
if show_hurricanes: | |
self.add_hurricane_layers(m) | |
# Add layer control | |
folium.LayerControl().add_to(m) | |
return m._repr_html_() | |
def add_hurricane_layers(self, folium_map): | |
"""Add hurricane tracking layers to the map""" | |
hurricane_data = self.get_hurricane_data() | |
for storm_info in hurricane_data: | |
storm_type = storm_info.get('type', 'unknown') | |
if storm_type == 'current_storms': | |
# Handle CurrentStorms.json format | |
storm_data = storm_info['data'] | |
self.add_current_storms_markers(folium_map, storm_data) | |
elif storm_type == 'gis_layer': | |
# Handle GIS layer format | |
geojson_data = storm_info['geojson'] | |
layer_name = storm_info.get('layer_name', 'Unknown Layer') | |
for feature in geojson_data.get('features', []): | |
if feature['geometry']['type'] == 'Point': | |
coords = feature['geometry']['coordinates'] | |
props = feature.get('properties', {}) | |
# Extract storm information | |
storm_name = props.get('STORMNAME', props.get('STORMNUM', 'Unknown Storm')) | |
storm_type_detail = props.get('STORMTYPE', 'Unknown') | |
max_wind = props.get('MAXWIND', props.get('INTENSITY', 'N/A')) | |
pressure = props.get('MSLP', 'N/A') | |
forecast_hour = props.get('FHOUR', props.get('TAU', 'Current')) | |
# Determine marker color based on storm intensity | |
color = self.get_storm_color(max_wind) | |
# Create popup with storm details | |
popup_text = f""" | |
<b>{storm_name}</b><br> | |
Type: {storm_type_detail}<br> | |
Max Wind: {max_wind} kt<br> | |
Pressure: {pressure} mb<br> | |
Forecast Hour: {forecast_hour}<br> | |
Source: {layer_name} | |
""" | |
# Add marker | |
folium.CircleMarker( | |
location=[coords[1], coords[0]], | |
radius=8, | |
popup=popup_text, | |
color=color, | |
fill=True, | |
fillColor=color, | |
fillOpacity=0.7, | |
weight=2 | |
).add_to(folium_map) | |
elif storm_type == 'sample': | |
# Handle sample data | |
geojson_data = storm_info['geojson'] | |
for feature in geojson_data.get('features', []): | |
if feature['geometry']['type'] == 'Point': | |
coords = feature['geometry']['coordinates'] | |
props = feature.get('properties', {}) | |
storm_name = props.get('STORMNAME', 'Sample Storm') | |
storm_type_detail = props.get('STORMTYPE', 'Sample') | |
max_wind = props.get('MAXWIND', '75') | |
pressure = props.get('MSLP', '980') | |
color = self.get_storm_color(max_wind) | |
popup_text = f""" | |
<b>{storm_name} (Sample)</b><br> | |
Type: {storm_type_detail}<br> | |
Max Wind: {max_wind} kt<br> | |
Pressure: {pressure} mb<br> | |
Note: This is sample data for demonstration | |
""" | |
folium.CircleMarker( | |
location=[coords[1], coords[0]], | |
radius=10, | |
popup=popup_text, | |
color=color, | |
fill=True, | |
fillColor=color, | |
fillOpacity=0.7, | |
weight=3 | |
).add_to(folium_map) | |
def add_current_storms_markers(self, folium_map, storm_data): | |
"""Add markers for storms from CurrentStorms.json format""" | |
# This method handles the specific format from NHC CurrentStorms.json | |
# Implementation depends on the actual JSON structure | |
pass | |
def add_storm_tracks(self, folium_map, track_data): | |
"""Add storm track lines to the map""" | |
for feature in track_data.get('features', []): | |
if feature['geometry']['type'] == 'LineString': | |
coords = feature['geometry']['coordinates'] | |
# Convert coordinates from [lon, lat] to [lat, lon] for Folium | |
track_coords = [[coord[1], coord[0]] for coord in coords] | |
props = feature.get('properties', {}) | |
storm_name = props.get('STORMNAME', 'Storm Track') | |
folium.PolyLine( | |
locations=track_coords, | |
color='red', | |
weight=3, | |
opacity=0.8, | |
popup=f"Track: {storm_name}" | |
).add_to(folium_map) | |
def get_storm_color(self, max_wind): | |
"""Get color based on storm intensity (Saffir-Simpson scale)""" | |
try: | |
wind_speed = float(max_wind) if max_wind != 'N/A' else 0 | |
if wind_speed >= 137: # Category 5 | |
return 'purple' | |
elif wind_speed >= 113: # Category 4 | |
return 'red' | |
elif wind_speed >= 96: # Category 3 | |
return 'orange' | |
elif wind_speed >= 83: # Category 2 | |
return 'yellow' | |
elif wind_speed >= 64: # Category 1 | |
return 'green' | |
elif wind_speed >= 34: # Tropical Storm | |
return 'blue' | |
else: # Tropical Depression | |
return 'lightblue' | |
except: | |
return 'gray' | |
def get_available_times(self): | |
"""Get list of available radar times for dropdown""" | |
radar_data = self.get_radar_data() | |
if radar_data and 'radar' in radar_data: | |
past_data = radar_data['radar']['past'] | |
nowcast_data = radar_data['radar']['nowcast'] | |
all_times = past_data + nowcast_data | |
time_options = [] | |
for i, time_data in enumerate(all_times): | |
timestamp = time_data['time'] | |
dt = datetime.fromtimestamp(timestamp) | |
time_options.append(f"{i}: {dt.strftime('%Y-%m-%d %H:%M:%S')}") | |
return time_options | |
return ["No data available"] | |
def create_gradio_app(): | |
rain_viewer = RainViewerMap() | |
# Get available times for dropdown with error handling | |
try: | |
time_options = rain_viewer.get_available_times() | |
if not time_options: | |
time_options = ["No data available"] | |
except Exception as e: | |
print(f"Error getting initial time options: {e}") | |
time_options = ["Loading..."] | |
with gr.Blocks(title="RainViewer Radar Map") as demo: | |
gr.Markdown("# Weather Radar & Hurricane Tracker") | |
gr.Markdown("Interactive weather map with radar data from RainViewer and hurricane tracking from NOAA NHC") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
lat_input = gr.Number( | |
label="Latitude", | |
value=40.7128, | |
info="Map center latitude" | |
) | |
lon_input = gr.Number( | |
label="Longitude", | |
value=-74.0060, | |
info="Map center longitude" | |
) | |
zoom_input = gr.Slider( | |
minimum=1, | |
maximum=15, | |
value=8, | |
step=1, | |
label="Zoom Level" | |
) | |
show_radar = gr.Checkbox( | |
label="Show Radar", | |
value=True | |
) | |
show_hurricanes = gr.Checkbox( | |
label="Show Hurricanes", | |
value=True | |
) | |
time_dropdown = gr.Dropdown( | |
choices=time_options, | |
label="Radar Time", | |
value=time_options[-1] if time_options else None, | |
info="Select radar timestamp" | |
) | |
gr.Markdown("### Hurricane Information") | |
storm_list = gr.HTML( | |
value="Loading storm data...", | |
label="Active Storms" | |
) | |
update_btn = gr.Button("Update Map", variant="primary") | |
refresh_times_btn = gr.Button("Refresh Times") | |
refresh_storms_btn = gr.Button("Refresh Storms") | |
with gr.Column(scale=2): | |
map_html = gr.HTML( | |
value=rain_viewer.create_map(), | |
label="Radar Map" | |
) | |
def update_map(lat, lon, zoom, show_radar_flag, show_hurricanes_flag, selected_time): | |
time_index = 0 | |
if selected_time and ":" in selected_time and selected_time != "No data available" and selected_time != "Loading...": | |
try: | |
time_index = int(selected_time.split(":")[0]) | |
# Validate time_index against available data | |
available_times = rain_viewer.get_available_times() | |
if time_index >= len(available_times): | |
time_index = len(available_times) - 1 if available_times else 0 | |
except (ValueError, IndexError): | |
time_index = 0 # Default to first time if parsing fails | |
return rain_viewer.create_map(lat, lon, zoom, show_radar_flag, time_index, show_hurricanes_flag) | |
def refresh_times(): | |
new_times = rain_viewer.get_available_times() | |
# Return both updated choices and a safe default value | |
safe_value = new_times[0] if new_times else None # Use first item instead of last | |
return gr.Dropdown(choices=new_times, value=safe_value) | |
def get_storm_list_html(): | |
"""Generate HTML for storm list display""" | |
hurricane_data = rain_viewer.get_hurricane_data() | |
if not hurricane_data: | |
return "<p>No active storms currently detected.</p>" | |
html_content = "<div style='font-family: Arial, sans-serif;'>" | |
html_content += "<h4>Active Storms:</h4>" | |
for i, storm_info in enumerate(hurricane_data): | |
storm_type = storm_info.get('type', 'unknown') | |
source = storm_info.get('source', 'Unknown') | |
html_content += f"<div style='margin: 10px 0; padding: 10px; border: 1px solid #ddd; border-radius: 5px;'>" | |
html_content += f"<strong>Storm #{i+1}</strong> - Source: {source}<br>" | |
if storm_type == 'nhc_storm': | |
storm_id = storm_info.get('storm_id', 'Unknown') | |
forecast_points = storm_info.get('forecast_points', {}) | |
features = forecast_points.get('features', []) | |
if features: | |
# Get current position (tau=0) and latest forecast | |
current_pos = None | |
latest_forecast = None | |
for feature in features: | |
props = feature.get('properties', {}) | |
tau = props.get('tau', 999) | |
if tau == 0: | |
current_pos = props | |
if not latest_forecast or tau > latest_forecast.get('tau', -1): | |
latest_forecast = props | |
display_props = current_pos if current_pos else latest_forecast | |
if display_props: | |
storm_name = display_props.get('stormname', storm_id) | |
storm_type_detail = display_props.get('stormtype', 'Unknown') | |
max_wind = display_props.get('maxwind', 'N/A') | |
pressure = display_props.get('mslp', 'N/A') | |
advisory_num = display_props.get('advisnum', 'N/A') | |
advisory_date = display_props.get('advdate', 'N/A') | |
ss_num = display_props.get('ssnum', 0) | |
lat = display_props.get('lat', 'N/A') | |
lon = display_props.get('lon', 'N/A') | |
html_content += f"<strong>Name:</strong> {storm_name}<br>" | |
html_content += f"<strong>Type:</strong> {storm_type_detail}<br>" | |
html_content += f"<strong>Max Wind:</strong> {max_wind} kt<br>" | |
html_content += f"<strong>Pressure:</strong> {pressure} mb<br>" | |
html_content += f"<strong>Category:</strong> {ss_num if ss_num > 0 else 'N/A'}<br>" | |
html_content += f"<strong>Position:</strong> {lat}°N, {abs(float(lon)) if lon != 'N/A' else 'N/A'}°W<br>" | |
html_content += f"<strong>Advisory:</strong> #{advisory_num}<br>" | |
html_content += f"<strong>Updated:</strong> {advisory_date}<br>" | |
html_content += f"<strong>Data Points:</strong> {len(features)}<br>" | |
elif storm_type == 'sample': | |
html_content += "<em>Sample data for demonstration</em><br>" | |
html_content += "Name: Sample Hurricane<br>" | |
html_content += "Max Wind: 75 kt<br>" | |
html_content += "Pressure: 980 mb<br>" | |
html_content += "</div>" | |
# Add error information if available | |
errors = rain_viewer.storm_cache.get('errors', []) | |
if errors: | |
html_content += "<div style='margin-top: 15px; padding: 10px; background-color: #fff3cd; border: 1px solid #ffeaa7; border-radius: 5px;'>" | |
html_content += "<strong>Debug Info:</strong><br>" | |
for error in errors[-3:]: # Show last 3 errors | |
html_content += f"• {error}<br>" | |
html_content += "</div>" | |
html_content += "</div>" | |
return html_content | |
update_btn.click( | |
fn=update_map, | |
inputs=[lat_input, lon_input, zoom_input, show_radar, show_hurricanes, time_dropdown], | |
outputs=map_html | |
) | |
refresh_times_btn.click( | |
fn=refresh_times, | |
outputs=time_dropdown | |
) | |
refresh_storms_btn.click( | |
fn=get_storm_list_html, | |
outputs=storm_list | |
) | |
# Initialize components on load | |
def initialize_app(): | |
storm_html = get_storm_list_html() | |
# Also refresh time options on load to ensure they're current | |
current_times = rain_viewer.get_available_times() | |
safe_time_value = current_times[0] if current_times else None | |
return storm_html, gr.Dropdown(choices=current_times, value=safe_time_value) | |
demo.load( | |
fn=initialize_app, | |
outputs=[storm_list, time_dropdown] | |
) | |
# Auto-update only on manual controls (not time dropdown to prevent conflicts) | |
for input_component in [lat_input, lon_input, zoom_input, show_radar, show_hurricanes]: | |
input_component.change( | |
fn=update_map, | |
inputs=[lat_input, lon_input, zoom_input, show_radar, show_hurricanes, time_dropdown], | |
outputs=map_html | |
) | |
# Only update map when time dropdown is explicitly changed by user | |
time_dropdown.select( | |
fn=update_map, | |
inputs=[lat_input, lon_input, zoom_input, show_radar, show_hurricanes, time_dropdown], | |
outputs=map_html | |
) | |
return demo | |
if __name__ == "__main__": | |
app = create_gradio_app() | |
app.launch(share=True) |