File size: 8,151 Bytes
4c75d73 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
import asyncio
import json
import gradio as gr
from mcp import ClientSession
from mcp.client.sse import sse_client
from contextlib import AsyncExitStack
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class SimpleMCPClient:
def __init__(self):
self.session = None
self.connected = False
self.tools = []
self.exit_stack = None
self.server_url = "https://chris4k-weather.hf.space/gradio_api/mcp/sse"
def connect(self) -> str:
"""Connect to the hardcoded MCP server"""
return loop.run_until_complete(self._connect())
async def _connect(self) -> str:
try:
# Clean up previous connection
if self.exit_stack:
await self.exit_stack.aclose()
self.exit_stack = AsyncExitStack()
# Connect to SSE MCP server
sse_transport = await self.exit_stack.enter_async_context(
sse_client(self.server_url)
)
read_stream, write_callable = sse_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(read_stream, write_callable)
)
await self.session.initialize()
# Get available tools
response = await self.session.list_tools()
self.tools = response.tools
self.connected = True
tool_names = [tool.name for tool in self.tools]
return f"✅ Connected to weather server!\nAvailable tools: {', '.join(tool_names)}"
except Exception as e:
self.connected = False
return f"❌ Connection failed: {str(e)}"
def get_weather(self, location: str) -> str:
"""Get weather for a location (city, country format)"""
if not self.connected:
return "❌ Not connected to server. Click Connect first."
if not location.strip():
return "❌ Please enter a location (e.g., 'Berlin, Germany')"
return loop.run_until_complete(self._get_weather(location))
async def _get_weather(self, location: str) -> str:
try:
# Parse location
if ',' in location:
city, country = [part.strip() for part in location.split(',', 1)]
else:
city = location.strip()
country = ""
# Find the weather tool
weather_tool = next((tool for tool in self.tools if 'weather' in tool.name.lower()), None)
if not weather_tool:
return "❌ Weather tool not found on server"
# Call the tool
params = {"city": city, "country": country}
result = await self.session.call_tool(weather_tool.name, params)
# Extract content properly
content_text = ""
if hasattr(result, 'content') and result.content:
if isinstance(result.content, list):
for content_item in result.content:
if hasattr(content_item, 'text'):
content_text += content_item.text
elif hasattr(content_item, 'content'):
content_text += str(content_item.content)
else:
content_text += str(content_item)
elif hasattr(result.content, 'text'):
content_text = result.content.text
else:
content_text = str(result.content)
if not content_text:
return "❌ No content received from server"
try:
# Try to parse as JSON
parsed = json.loads(content_text)
if isinstance(parsed, dict):
if 'error' in parsed:
return f"❌ Error: {parsed['error']}"
# Format weather data nicely
if 'current_weather' in parsed:
weather = parsed['current_weather']
formatted = f"🌍 **{parsed.get('location', 'Unknown')}**\n\n"
formatted += f"🌡️ Temperature: {weather.get('temperature_celsius', 'N/A')}°C\n"
formatted += f"🌤️ Conditions: {weather.get('weather_description', 'N/A')}\n"
formatted += f"💨 Wind: {weather.get('wind_speed_kmh', 'N/A')} km/h\n"
formatted += f"💧 Humidity: {weather.get('humidity_percent', 'N/A')}%\n"
return formatted
elif 'temperature (°C)' in parsed:
# Handle the original format from your server
formatted = f"🌍 **{parsed.get('location', 'Unknown')}**\n\n"
formatted += f"🌡️ Temperature: {parsed.get('temperature (°C)', 'N/A')}°C\n"
formatted += f"🌤️ Weather Code: {parsed.get('weather_code', 'N/A')}\n"
formatted += f"🕐 Timezone: {parsed.get('timezone', 'N/A')}\n"
formatted += f"🕒 Local Time: {parsed.get('local_time', 'N/A')}\n"
return formatted
else:
return f"✅ Weather data:\n```json\n{json.dumps(parsed, indent=2)}\n```"
except json.JSONDecodeError:
# If not JSON, return as text
return f"✅ Weather data:\n```\n{content_text}\n```"
return f"✅ Raw result:\n{content_text}"
except Exception as e:
return f"❌ Failed to get weather: {str(e)}"
# Global client
client = SimpleMCPClient()
def create_interface():
with gr.Blocks(title="Weather MCP Test", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🌤️ Weather MCP Test Client")
gr.Markdown("Simple client to test the weather MCP server")
# Connection
with gr.Row():
connect_btn = gr.Button("🔌 Connect to Weather Server", variant="primary")
status = gr.Textbox(
label="Status",
value="Click Connect to start",
interactive=False,
scale=2
)
# Weather query
with gr.Group():
gr.Markdown("### Get Weather")
with gr.Row():
location_input = gr.Textbox(
label="Location",
placeholder="e.g., Berlin, Germany",
value="Berlin, Germany",
scale=3
)
weather_btn = gr.Button("🌡️ Get Weather", scale=1)
weather_result = gr.Textbox(
label="Weather Result",
interactive=False,
lines=8,
placeholder="Weather information will appear here..."
)
# Examples
with gr.Group():
gr.Markdown("### 📝 Examples")
examples = gr.Examples(
examples=[
["Berlin, Germany"],
["Tokyo, Japan"],
["New York, USA"],
["London, UK"],
["Sydney, Australia"]
],
inputs=[location_input]
)
# Event handlers
connect_btn.click(
client.connect,
outputs=[status]
)
weather_btn.click(
client.get_weather,
inputs=[location_input],
outputs=[weather_result]
)
location_input.submit(
client.get_weather,
inputs=[location_input],
outputs=[weather_result]
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7777,
share=False,
show_error=True
)
|