Sentinel_One / tools /tools.py
BARBASTE Paul
Initial commit
dd370c5
import math
import requests
import matplotlib.pyplot as plt
import seaborn as sns
import tempfile
import os
from config import NASA_FIRMS_MAP_KEY
from datetime import datetime, timedelta
from smolagents import tool
from fpdf import FPDF
@tool
def get_coordinates(city: str) -> dict:
"""Get latitude and longitude of a city using OpenStreetMap Nominatim API.
Args:
city: Name of the city to get coordinates for
Returns:
Dict with city name, latitude, longitude, or error message
"""
url = "https://nominatim.openstreetmap.org/search"
params = {"q": city, "format": "json", "limit": 1}
headers = {"User-Agent": "ClimateRiskTool/1.0"}
try:
response = requests.get(url, params=params, headers=headers, timeout=10)
data = response.json()
if not data:
return {"error": f"City '{city}' not found"}
return {
"city": city,
"latitude": float(data[0]["lat"]),
"longitude": float(data[0]["lon"]),
}
except Exception as e:
return {"error": str(e)}
@tool
def get_weather_forecast(lat: float, lon: float) -> dict:
"""Get weather forecast data for risk analysis.
Args:
lat: Latitude coordinate
lon: Longitude coordinate
Returns:
Dict with weather forecast data or error message
"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"daily": [
"temperature_2m_max",
"temperature_2m_min",
"precipitation_sum",
"wind_speed_10m_max",
"wind_gusts_10m_max",
"relative_humidity_2m_min",
],
"forecast_days": 7,
"timezone": "auto",
}
try:
response = requests.get(url, params=params, timeout=10)
return response.json()
except Exception as e:
return {"error": str(e)}
@tool
def get_flood_data(lat: float, lon: float) -> dict:
"""Get flood forecast data.
Args:
lat: Latitude coordinate
lon: Longitude coordinate
Returns:
Dict with flood forecast data or error message
"""
url = "https://flood-api.open-meteo.com/v1/flood"
params = {
"latitude": lat,
"longitude": lon,
"daily": ["river_discharge", "river_discharge_mean", "river_discharge_max"],
"forecast_days": 7,
}
try:
response = requests.get(url, params=params, timeout=10)
return response.json()
except Exception as e:
return {"error": str(e)}
@tool
def get_earthquake_data(
lat: float, lon: float, radius_km: float = 100, days: int = 30
) -> dict:
"""Get raw earthquake data from USGS.
Args:
lat: Latitude coordinate
lon: Longitude coordinate
radius_km: Search radius in kilometers (default 100km)
days: Number of days to look back (default 30 days)
Returns:
Dict with raw earthquake data from USGS
"""
url = "https://earthquake.usgs.gov/fdsnws/event/1/query"
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
params = {
"format": "geojson",
"starttime": start_date.strftime("%Y-%m-%d"),
"endtime": end_date.strftime("%Y-%m-%d"),
"latitude": lat,
"longitude": lon,
"maxradiuskm": radius_km,
"minmagnitude": 1.0,
"orderby": "time-desc",
}
try:
response = requests.get(url, params=params, timeout=15)
response.raise_for_status()
data = response.json()
earthquakes = []
for feature in data.get("features", []):
props = feature["properties"]
coords = feature["geometry"]["coordinates"]
earthquakes.append(
{
"magnitude": props.get("mag"),
"place": props.get("place"),
"time": props.get("time"),
"depth": coords[2] if len(coords) > 2 else None,
"latitude": coords[1],
"longitude": coords[0],
"alert": props.get("alert"),
"significance": props.get("sig"),
"event_type": props.get("type"),
"title": props.get("title"),
}
)
return {
"earthquakes": earthquakes,
"query_location": {
"lat": lat,
"lon": lon,
"radius_km": radius_km,
"days": days,
},
"data_source": "USGS",
}
except Exception as e:
return {"error": str(e)}
@tool
def get_nasa_fire_data(
lat: float, lon: float, radius_km: float = 50, days: int = 2
) -> dict:
"""Get raw wildfire detection data from NASA FIRMS satellites.
Args:
lat: Latitude coordinate
lon: Longitude coordinate
radius_km: Search radius in kilometers (default 50km)
days: Number of days to look back (default 2 days)
Returns:
Dict with raw fire detection data from NASA satellites
"""
if not NASA_FIRMS_MAP_KEY or NASA_FIRMS_MAP_KEY == "your-nasa-firms-api-key-here":
return {"error": "NASA FIRMS API key not configured in .env file"}
try:
lat_offset = radius_km / 111.0
lon_offset = radius_km / (111.0 * abs(math.cos(math.radians(lat))))
bbox = f"{lat - lat_offset},{lon - lon_offset},{lat + lat_offset},{lon + lon_offset}"
modis_url = f"https://firms.modaps.eosdis.nasa.gov/api/area/csv/{NASA_FIRMS_MAP_KEY}/MODIS_NRT/{bbox}/{days}"
viirs_url = f"https://firms.modaps.eosdis.nasa.gov/api/area/csv/{NASA_FIRMS_MAP_KEY}/VIIRS_NOAA20_NRT/{bbox}/{days}"
all_fires = []
try:
modis_response = requests.get(modis_url, timeout=15)
if modis_response.status_code == 200 and modis_response.text.strip():
all_fires.extend(_parse_nasa_csv(modis_response.text, "MODIS"))
except:
pass
try:
viirs_response = requests.get(viirs_url, timeout=15)
if viirs_response.status_code == 200 and viirs_response.text.strip():
all_fires.extend(_parse_nasa_csv(viirs_response.text, "VIIRS"))
except:
pass
return {
"fires": all_fires,
"query_location": {
"lat": lat,
"lon": lon,
"radius_km": radius_km,
"days": days,
},
"data_source": "NASA_FIRMS",
}
except Exception as e:
return {"error": str(e)}
def _parse_nasa_csv(csv_text: str, source: str) -> list:
"""Parse NASA FIRMS CSV data.
Args:
csv_text: CSV text data from NASA FIRMS API
source: Source identifier (MODIS or VIIRS)
Returns:
List of fire detection dictionaries
"""
fires = []
lines = csv_text.strip().split("\n")
if len(lines) < 2:
return fires
for line in lines[1:]:
try:
values = line.split(",")
if len(values) >= 9:
fires.append(
{
"latitude": float(values[0]),
"longitude": float(values[1]),
"brightness": float(values[2]) if values[2] else 0,
"scan": float(values[3]) if values[3] else 0,
"track": float(values[4]) if values[4] else 0,
"acq_date": values[5],
"acq_time": values[6],
"satellite": values[7],
"confidence": int(values[8]) if values[8].isdigit() else 50,
"version": values[9] if len(values) > 9 else "",
"bright_t31": (
float(values[10]) if len(values) > 10 and values[10] else 0
),
"frp": (
float(values[11]) if len(values) > 11 and values[11] else 0
),
"daynight": values[12] if len(values) > 12 else "",
"source": source,
}
)
except (ValueError, IndexError):
continue
return fires
@tool
def find_local_emergency_resources(lat: float, lon: float) -> dict:
"""Find local emergency resources and contacts.
Args:
lat: Latitude coordinate
lon: Longitude coordinate
Returns:
Dict with local emergency resources or error message
"""
try:
query = f"""
[out:json][timeout:15];
(
node[amenity=hospital](around:10000,{lat},{lon});
node[amenity=fire_station](around:10000,{lat},{lon});
node[amenity=police](around:10000,{lat},{lon});
);
out center meta;
"""
response = requests.post(
"https://overpass-api.de/api/interpreter", data=query, timeout=20
)
if response.status_code == 200:
data = response.json()
resources = []
for element in data.get("elements", [])[:5]:
tags = element.get("tags", {})
resources.append(
{
"name": tags.get("name", "Unnamed facility"),
"type": tags.get("amenity", "unknown"),
"latitude": element.get("lat", lat),
"longitude": element.get("lon", lon),
}
)
return {"local_resources": resources}
return {"local_resources": []}
except Exception as e:
return {"error": str(e)}
@tool
def generate_analysis_report(
data: dict, filename: str = "climate_risk_report.pdf"
) -> dict:
"""Generate a consolidated analysis report with visualizations.
Args:
data: Consolidated data from various tools, expected to include:
- weather forecast
- flood data
- earthquake data
- fire data
filename: Desired filename for the exported PDF report
Returns:
Dict with success message and file path or error
"""
try:
# Temporary directory for plots
with tempfile.TemporaryDirectory() as temp_dir:
# Initialize the PDF
pdf = FPDF()
pdf.set_auto_page_break(auto=True, margin=15)
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.set_text_color(50, 50, 50)
# Add Title
pdf.set_font("Arial", style="B", size=16)
pdf.cell(0, 10, "Climate Risk Analysis Report", ln=True, align="C")
pdf.ln(10) # Line break
# Helper function to save and plot visualizations
def save_plot(fig, plot_name):
path = f"{temp_dir}/{plot_name}.png"
fig.savefig(path)
plt.close(fig)
return path
# Plot weather data
weather_data = data.get("weather_forecast", {}).get("daily", {})
if weather_data:
dates = [
d for d in range(1, len(weather_data["temperature_2m_max"]) + 1)
]
weather_df = {
"Day": dates,
"Max Temperature (°C)": weather_data["temperature_2m_max"],
"Min Temperature (°C)": weather_data["temperature_2m_min"],
"Precipitation (mm)": weather_data["precipitation_sum"],
}
fig, ax = plt.subplots(figsize=(8, 5))
sns.lineplot(
x="Day",
y="Max Temperature (°C)",
data=weather_df,
ax=ax,
label="Max Temp",
color="red",
)
sns.lineplot(
x="Day",
y="Min Temperature (°C)",
data=weather_df,
ax=ax,
label="Min Temp",
color="blue",
)
sns.barplot(
x="Day",
y="Precipitation (mm)",
data=weather_df,
ax=ax,
color="gray",
alpha=0.5,
)
ax.set_title("Weather Forecast")
ax.set_xlabel("Day")
ax.set_ylabel("Values")
ax.legend()
weather_plot_path = save_plot(fig, "weather_plot")
pdf.image(weather_plot_path, x=10, y=None, w=180)
pdf.ln(10)
# Plot earthquake data
earthquake_data = data.get("earthquake_data", {}).get("earthquakes", [])
if earthquake_data:
magnitudes = [
eq["magnitude"] for eq in earthquake_data if eq.get("magnitude")
]
depths = [eq["depth"] for eq in earthquake_data if eq.get("depth")]
places = [eq["place"] for eq in earthquake_data]
fig, ax = plt.subplots(figsize=(8, 5))
sns.scatterplot(
x=depths, y=magnitudes, hue=places, ax=ax, palette="tab10", s=100
)
ax.set_title("Earthquake Analysis")
ax.set_xlabel("Depth (km)")
ax.set_ylabel("Magnitude")
ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
earthquake_plot_path = save_plot(fig, "earthquake_plot")
pdf.image(earthquake_plot_path, x=10, y=None, w=180)
pdf.ln(10)
# Plot fire data
fire_data = data.get("fire_data", {}).get("fires", [])
if fire_data:
brightness = [fire["brightness"] for fire in fire_data]
confidence = [fire["confidence"] for fire in fire_data]
fig, ax = plt.subplots(figsize=(8, 5))
sns.histplot(
brightness,
bins=20,
ax=ax,
kde=True,
color="orange",
label="Brightness",
)
sns.histplot(
confidence,
bins=20,
ax=ax,
kde=True,
color="green",
alpha=0.5,
label="Confidence",
)
ax.set_title("Wildfire Brightness vs Confidence")
ax.set_xlabel("Value")
ax.legend()
fire_plot_path = save_plot(fig, "fire_plot")
pdf.image(fire_plot_path, x=10, y=None, w=180)
pdf.ln(10)
# Save PDF report
pdf_output_path = os.path.join(temp_dir, filename)
pdf.output(pdf_output_path)
return {"success": True, "file_path": pdf_output_path}
except Exception as e:
return {"error": str(e)}
@tool
def get_full_daily_forecast(lat: float, lon: float) -> dict:
"""
Get all available daily weather forecast parameters from Open-Meteo API.
Args:
lat: Latitude.
lon: Longitude.
Returns:
Dict with all daily forecast data or error.
"""
daily_params = [
"temperature_2m_max", "temperature_2m_mean", "temperature_2m_min",
"apparent_temperature_max", "apparent_temperature_mean", "apparent_temperature_min",
"precipitation_sum", "rain_sum", "showers_sum", "snowfall_sum",
"precipitation_hours",
"precipitation_probability_max", "precipitation_probability_mean", "precipitation_probability_min",
"weather_code", "sunrise", "sunset",
"sunshine_duration", "daylight_duration",
"wind_speed_10m_max", "wind_gusts_10m_max", "wind_direction_10m_dominant",
"shortwave_radiation_sum", "et0_fao_evapotranspiration",
"uv_index_max", "uv_index_clear_sky_max"
]
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"timezone": "auto",
"daily": ",".join(daily_params)
}
try:
response = requests.get(url, params=params, timeout=10)
return response.json()
except Exception as e:
return {"error": str(e)}
@tool
def climate_change_data(
lat: float,
lon: float,
start_date: str = "1950-01-01",
end_date: str = "2050-12-31",
models: list[str] = None
) -> dict:
"""
Get all available daily climate parameters from Open-Meteo Climate API.
Args:
lat: Latitude.
lon: Longitude.
start_date: Start date in yyyy-mm-dd (default 1950-01-01).
end_date: End date in yyyy-mm-dd (default 2050-12-31).
models: Optional list of climate models (default: all models).
Returns:
Dict with all daily climate data or error.
"""
daily_params = [
"temperature_2m_max", "temperature_2m_min", "temperature_2m_mean",
"cloud_cover_mean",
"relative_humidity_2m_max", "relative_humidity_2m_min", "relative_humidity_2m_mean",
"soil_moisture_0_to_10cm_mean",
"precipitation_sum", "rain_sum", "snowfall_sum",
"wind_speed_10m_mean", "wind_speed_10m_max",
"pressure_msl_mean",
"shortwave_radiation_sum"
]
if models is None:
models = [
"CMCC_CM2_VHR4", "FGOALS_f3_H", "HiRAM_SIT_HR",
"MRI_AGCM3_2_S", "EC_Earth3P_HR", "MPI_ESM1_2_XR", "NICAM16_8S"
]
url = "https://climate-api.open-meteo.com/v1/climate"
params = {
"latitude": lat,
"longitude": lon,
"start_date": start_date,
"end_date": end_date,
"models": ",".join(models),
"daily": ",".join(daily_params),
"timezone": "auto"
}
try:
response = requests.get(url, params=params, timeout=60)
return response.json()
except Exception as e:
return {"error": str(e)}
@tool
def get_full_air_quality_forecast(
lat: float,
lon: float,
forecast_days: int = 5,
past_days: int = 0,
domain: str = "auto"
) -> dict:
"""
Get all available hourly air quality forecast parameters from Open-Meteo Air Quality API.
Args:
lat: Latitude.
lon: Longitude.
forecast_days: Number of forecast days (default 5, max 7).
past_days: Number of past days (default 0, max 92).
domain: 'auto', 'cams_europe', or 'cams_global'.
Returns:
Dict with all hourly air quality data or error.
"""
hourly_params = [
"pm10", "pm2_5", "carbon_monoxide", "carbon_dioxide",
"nitrogen_dioxide", "sulphur_dioxide", "ozone", "aerosol_optical_depth",
"dust", "uv_index", "uv_index_clear_sky", "ammonia", "methane",
"alder_pollen", "birch_pollen", "grass_pollen", "mugwort_pollen",
"olive_pollen", "ragweed_pollen", "european_aqi", "us_aqi"
]
url = "https://air-quality-api.open-meteo.com/v1/air-quality"
params = {
"latitude": lat,
"longitude": lon,
"forecast_days": min(max(forecast_days, 0), 7),
"past_days": min(max(past_days, 0), 92),
"hourly": ",".join(hourly_params),
"domains": domain,
"timezone": "auto",
}
try:
response = requests.get(url, params=params, timeout=30)
return response.json()
except Exception as e:
return {"error": str(e)}
@tool
def get_full_marine_daily_forecast(lat: float, lon: float) -> dict:
"""
Get all available daily marine forecast parameters from Open-Meteo Marine API.
Args:
lat: Latitude.
lon: Longitude.
Returns:
Dict with all daily marine forecast data or error.
"""
daily_params = [
"wave_height_max", "wind_wave_height_max", "swell_wave_height_max",
"wave_direction_dominant", "wind_wave_direction_dominant", "swell_wave_direction_dominant",
"wave_period_max", "wind_wave_period_max", "swell_wave_period_max",
"wind_wave_peak_period_max", "swell_wave_peak_period_max"
]
url = "https://marine-api.open-meteo.com/v1/marine"
params = {
"latitude": lat,
"longitude": lon,
"timezone": "auto",
"daily": ",".join(daily_params)
}
try:
response = requests.get(url, params=params, timeout=10)
return response.json()
except Exception as e:
return {"error": str(e)}
@tool
def get_full_flood_daily_forecast(lat: float, lon: float) -> dict:
"""
Get all available daily flood parameters from Open-Meteo Flood API.
Args:
lat: Latitude.
lon: Longitude.
Returns:
Dict with all daily flood forecast data or error.
"""
daily_params = [
"river_discharge",
"river_discharge_mean",
"river_discharge_median",
"river_discharge_max",
"river_discharge_min",
"river_discharge_p25",
"river_discharge_p75"
]
url = "https://flood-api.open-meteo.com/v1/flood"
params = {
"latitude": lat,
"longitude": lon,
"daily": ",".join(daily_params)
}
try:
response = requests.get(url, params=params, timeout=10)
return response.json()
except Exception as e:
return {"error": str(e)}
@tool
def get_full_satellite_radiation(
lat: float,
lon: float,
start_date: str = None,
end_date: str = None,
hourly_native: bool = False,
tilt: int = 0,
azimuth: int = 0
) -> dict:
"""
Get all available hourly satellite solar radiation parameters from Open-Meteo Satellite API.
Args:
lat: Latitude.
lon: Longitude.
start_date: (optional) Start date (yyyy-mm-dd). If None, today.
end_date: (optional) End date (yyyy-mm-dd). If None, today.
hourly_native: Use native satellite temporal resolution (10/15/30min) if True, else hourly.
tilt: Tilt for GTI (default 0 = horizontal).
azimuth: Azimuth for GTI (default 0 = south).
Returns:
Dict with all hourly satellite solar radiation data or error.
"""
hourly_params = [
"shortwave_radiation", "diffuse_radiation", "direct_radiation",
"direct_normal_irradiance", "global_tilted_irradiance",
"terrestrial_radiation",
"shortwave_radiation_instant", "diffuse_radiation_instant", "direct_radiation_instant",
"direct_normal_irradiance_instant", "global_tilted_irradiance_instant",
"terrestrial_radiation_instant"
]
url = "https://satellite-api.open-meteo.com/v1/archive"
today = datetime.utcnow().date()
if start_date is None:
start_date = str(today)
if end_date is None:
end_date = str(today)
params = {
"latitude": lat,
"longitude": lon,
"start_date": start_date,
"end_date": end_date,
"hourly": ",".join(hourly_params),
"models": "satellite_radiation_seamless",
"timezone": "auto",
"tilt": tilt,
"azimuth": azimuth,
}
if hourly_native:
params["hourly_native"] = "true"
try:
response = requests.get(url, params=params, timeout=30)
return response.json()
except Exception as e:
return {"error": str(e)}