|
"""
|
|
SearchHF Oracle Addon - MCP-based Hugging Face search interface with NPC spawning
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import json
|
|
from typing import Dict, Any, Optional, List
|
|
import gradio as gr
|
|
from mcp import ClientSession
|
|
from mcp.client.sse import sse_client
|
|
from contextlib import AsyncExitStack
|
|
|
|
try:
|
|
|
|
from ..interfaces.npc_addon import NPCAddon
|
|
from .generic_mcp_server_addon import register_mcp_results_as_npcs, list_active_mcp_npcs, remove_mcp_npc, clear_all_mcp_npcs
|
|
except ImportError:
|
|
|
|
try:
|
|
from src.interfaces.npc_addon import NPCAddon
|
|
from src.addons.generic_mcp_server_addon import register_mcp_results_as_npcs, list_active_mcp_npcs, remove_mcp_npc, clear_all_mcp_npcs
|
|
except ImportError:
|
|
|
|
print("[WARNING] Could not import NPC addon dependencies - running in limited mode")
|
|
|
|
class NPCAddon:
|
|
def __init__(self):
|
|
pass
|
|
|
|
@property
|
|
def addon_id(self):
|
|
return "searchhf_oracle"
|
|
|
|
@property
|
|
def addon_name(self):
|
|
return "SearchHF Oracle"
|
|
|
|
def register_mcp_results_as_npcs(results):
|
|
print(f"[WARNING] NPC registration not available - would register {len(results)} NPCs")
|
|
return []
|
|
|
|
def list_active_mcp_npcs():
|
|
return {}
|
|
|
|
def remove_mcp_npc(npc_id):
|
|
return False
|
|
|
|
def clear_all_mcp_npcs():
|
|
return 0
|
|
|
|
class SearchHFOracleAddon(NPCAddon):
|
|
"""SearchHF Oracle Addon for searching Hugging Face using MCP and spawning NPCs."""
|
|
|
|
def __init__(self):
|
|
|
|
self.name = "SearchHF Oracle"
|
|
self.description = "Advanced Hugging Face search using specialized MCP server. Search and add any HF MCP to the game world."
|
|
self.version = "1.0.0"
|
|
self.author = "MMOP Team"
|
|
|
|
|
|
self.character = "🔍"
|
|
self.position = (200, 100)
|
|
self.npc_name = "SearchHF Oracle"
|
|
|
|
|
|
self.mcp_server_url = "https://chris4k-searchhfformcp.hf.space/gradio_api/mcp/sse"
|
|
self.connected = False
|
|
self.last_connection_attempt = 0
|
|
self.connection_cooldown = 30
|
|
|
|
|
|
self.available_tools = []
|
|
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
|
|
super().__init__()
|
|
|
|
|
|
try:
|
|
self.loop = asyncio.get_event_loop()
|
|
except RuntimeError:
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
|
|
self.session = None
|
|
self.exit_stack = None
|
|
self.tools = []
|
|
|
|
@property
|
|
def addon_id(self) -> str:
|
|
"""Unique identifier for this add-on"""
|
|
return "searchhf_oracle"
|
|
|
|
@property
|
|
def addon_name(self) -> str:
|
|
"""Display name for this add-on"""
|
|
return "SearchHF Oracle"
|
|
|
|
@property
|
|
def npc_config(self) -> Dict:
|
|
"""NPC configuration for auto-placement in world"""
|
|
return {
|
|
'id': 'searchhf_oracle',
|
|
'name': self.npc_name,
|
|
'x': self.position[0],
|
|
'y': self.position[1],
|
|
'char': self.character,
|
|
'type': 'oracle',
|
|
'personality': 'searchhf',
|
|
'description': self.description
|
|
}
|
|
|
|
@property
|
|
def ui_tab_name(self) -> str:
|
|
"""UI tab name for this addon"""
|
|
return "SearchHF Oracle"
|
|
|
|
def handle_command(self, player_id: str, command: str) -> str:
|
|
"""Handle player commands via private messages"""
|
|
try:
|
|
if command.startswith("searchhf "):
|
|
query = command[9:].strip()
|
|
if not query:
|
|
return "❌ Please provide a search query (e.g., 'searchhf sentiment analysis')"
|
|
|
|
result = self.loop.run_until_complete(self._call_search_tool(query))
|
|
return result
|
|
|
|
elif command.startswith("spawn_mcp "):
|
|
query = command[10:].strip()
|
|
if not query:
|
|
return "❌ Please provide a search query for MCP spawning"
|
|
|
|
try:
|
|
|
|
self.logger.info(f"[SearchHF Spawn] ========== STARTING SPAWN PROCESS ==========")
|
|
self.logger.info(f"[SearchHF Spawn] Query: '{query}'")
|
|
self.logger.info(f"[SearchHF Spawn] Connected status: {self.connected}")
|
|
self.logger.info(f"[SearchHF Spawn] Available tools: {[tool.name for tool in self.tools]}")
|
|
|
|
self.logger.info(f"[SearchHF Spawn] About to call _call_search_tool_raw...")
|
|
result_data = self.loop.run_until_complete(self._call_search_tool_raw(query))
|
|
self.logger.info(f"[SearchHF Spawn] _call_search_tool_raw completed")
|
|
|
|
self.logger.info(f"[SearchHF Spawn] Received result_data type: {type(result_data)}")
|
|
self.logger.info(f"[SearchHF Spawn] Received result_data keys: {list(result_data.keys()) if isinstance(result_data, dict) else 'Not a dict'}")
|
|
self.logger.info(f"[SearchHF Spawn] Received result_data: {result_data}")
|
|
|
|
|
|
if not isinstance(result_data, dict):
|
|
self.logger.error(f"[SearchHF Spawn] Expected dict, got {type(result_data)}: {result_data}")
|
|
return f"❌ Invalid response format from search: expected dict, got {type(result_data)}"
|
|
|
|
if result_data.get("status") == "success":
|
|
spaces = result_data.get("results", [])[:5]
|
|
self.logger.info(f"[SearchHF Spawn] Found {len(spaces)} spaces")
|
|
|
|
if spaces:
|
|
self.logger.info(f"[SearchHF Spawn] First space data: {spaces[0]}")
|
|
else:
|
|
self.logger.warning(f"[SearchHF Spawn] No spaces found in results")
|
|
|
|
if not spaces:
|
|
return "❌ No MCP spaces found in search results"
|
|
|
|
|
|
valid_spaces = []
|
|
for i, space in enumerate(spaces):
|
|
if isinstance(space, dict) and space.get("mcp_server_url"):
|
|
valid_spaces.append(space)
|
|
else:
|
|
self.logger.warning(f"[SearchHF Spawn] Space {i} is invalid or missing mcp_server_url: {space}")
|
|
|
|
if not valid_spaces:
|
|
return f"❌ Found {len(spaces)} spaces but none have valid MCP server URLs"
|
|
|
|
self.logger.info(f"[SearchHF Spawn] {len(valid_spaces)} valid spaces ready for spawning")
|
|
spawned_npcs = register_mcp_results_as_npcs(valid_spaces)
|
|
self.logger.info(f"[SearchHF Spawn] register_mcp_results_as_npcs returned: {spawned_npcs}")
|
|
|
|
if spawned_npcs:
|
|
return f"✅ Spawned {len(spawned_npcs)} MCP NPCs from search: '{query}'\nNPCs: {', '.join(spawned_npcs)}"
|
|
else:
|
|
return f"❌ Failed to spawn NPCs. Found {len(valid_spaces)} valid spaces but none were successfully created."
|
|
else:
|
|
error_msg = result_data.get('message', 'Unknown error')
|
|
self.logger.error(f"[SearchHF Spawn] Search failed: {error_msg}")
|
|
return f"❌ Search failed: {error_msg}"
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"[SearchHF Spawn] Exception during MCP spawning: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return f"❌ Error during MCP spawning: {str(e)}"
|
|
|
|
elif command == "list_mcp_npcs":
|
|
active_npcs = list_active_mcp_npcs()
|
|
if not active_npcs:
|
|
return "🔌 No active MCP NPCs found"
|
|
|
|
npc_info = ["🔌 **Active MCP NPCs:**"]
|
|
for npc_id, info in active_npcs.items():
|
|
npc_info.append(f"• {info.get('name', npc_id)} - {info.get('description', 'No description')}")
|
|
|
|
return "\n".join(npc_info)
|
|
|
|
elif command.startswith("remove_mcp "):
|
|
npc_name = command[11:].strip()
|
|
npc_id = npc_name.lower().replace(" ", "_")
|
|
|
|
if remove_mcp_npc(npc_id):
|
|
return f"✅ Removed MCP NPC: {npc_name}"
|
|
else:
|
|
return f"❌ Could not find MCP NPC: {npc_name}"
|
|
|
|
elif command == "clear_mcp_npcs":
|
|
count = clear_all_mcp_npcs()
|
|
return f"✅ Removed {count} MCP NPCs from the game world"
|
|
|
|
elif command == "searchhf_connect":
|
|
result = self.connect_to_mcp()
|
|
if self.connected:
|
|
return f"✅ Connected to SearchHF MCP server with {len(self.tools)} tools"
|
|
else:
|
|
return f"❌ Connection failed: {result}"
|
|
|
|
elif command == "searchhf_status":
|
|
status = "🟢 Connected" if self.connected else "🔴 Disconnected"
|
|
return f"SearchHF Oracle Status: {status} | Tools: {len(self.tools)}"
|
|
|
|
elif command == "searchhf_tools":
|
|
if not self.tools:
|
|
return "❌ No tools available. Try connecting first with 'searchhf_connect'"
|
|
|
|
tools_list = [f"- {tool.name}: {getattr(tool, 'description', 'No description')}"
|
|
for tool in self.tools]
|
|
return f"🛠️ **Available SearchHF Tools:**\n" + "\n".join(tools_list)
|
|
|
|
elif command == "searchhf_help":
|
|
return """🔍 **SearchHF Oracle Help**
|
|
|
|
**Commands:**
|
|
- searchhf <query> - Search Hugging Face MCP spaces
|
|
- spawn_mcp <query> - Search and spawn MCP servers as NPCs
|
|
- list_mcp_npcs - List active MCP NPCs
|
|
- remove_mcp <name> - Remove MCP NPC by name
|
|
- clear_mcp_npcs - Remove all MCP NPCs
|
|
- searchhf_connect - Connect to MCP server
|
|
- searchhf_status - Check status
|
|
- searchhf_tools - List available tools
|
|
- searchhf_help - Show this help
|
|
|
|
**Examples:**
|
|
• searchhf sentiment analysis
|
|
• searchhf weather oracle
|
|
• spawn_mcp image generation
|
|
• list_mcp_npcs
|
|
• remove_mcp weather_oracle
|
|
|
|
⚡ **Powered by MCP (Model Context Protocol)**"""
|
|
|
|
return "❓ Unknown command. Use 'searchhf_help' for available commands."
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"[{self.name}] Error handling command {command}: {e}")
|
|
return f"❌ Error processing command: {str(e)}"
|
|
|
|
def on_startup(self):
|
|
"""Called when the addon is loaded during game startup"""
|
|
try:
|
|
|
|
result = self.connect_to_mcp()
|
|
self.logger.info(f"[{self.name}] Startup connection: {result}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"[{self.name}] Error during startup: {e}")
|
|
self.connected = False
|
|
|
|
def connect_to_mcp(self) -> str:
|
|
"""Synchronous connect to the SearchHF MCP server."""
|
|
current_time = time.time()
|
|
if current_time - self.last_connection_attempt < self.connection_cooldown:
|
|
return "⏳ Please wait before retrying connection..."
|
|
|
|
self.last_connection_attempt = current_time
|
|
|
|
try:
|
|
return self.loop.run_until_complete(self._connect())
|
|
except Exception as e:
|
|
self.connected = False
|
|
return f"❌ Connection failed: {e}"
|
|
|
|
async def _connect(self) -> str:
|
|
"""Async MCP connection using SSE."""
|
|
try:
|
|
|
|
if self.exit_stack:
|
|
await self.exit_stack.aclose()
|
|
|
|
self.exit_stack = AsyncExitStack()
|
|
|
|
|
|
sse_transport = await self.exit_stack.enter_async_context(
|
|
sse_client(self.mcp_server_url)
|
|
)
|
|
read_stream, write_callable = sse_transport
|
|
|
|
self.session = await self.exit_stack.enter_async_context(
|
|
ClientSession(read_stream, write_callable)
|
|
)
|
|
await self.session.initialize()
|
|
|
|
|
|
response = await self.session.list_tools()
|
|
self.tools = response.tools
|
|
|
|
self.connected = True
|
|
tool_names = [tool.name for tool in self.tools]
|
|
return f"✅ Connected to SearchHF MCP server!\nAvailable tools: {', '.join(tool_names)}"
|
|
|
|
except Exception as e:
|
|
self.connected = False
|
|
return f"❌ Connection failed: {str(e)}"
|
|
|
|
def get_interface(self) -> gr.Interface:
|
|
"""Create the Gradio interface for the SearchHF addon."""
|
|
|
|
def search_huggingface(query: str, max_results: int = 10, min_likes: int = 0) -> str:
|
|
"""Search Hugging Face using the MCP server."""
|
|
if not query.strip():
|
|
return "❌ Please enter a search query"
|
|
|
|
if not self.connected:
|
|
connect_result = self.connect_to_mcp()
|
|
if not self.connected:
|
|
return f"❌ Connection failed: {connect_result}"
|
|
try:
|
|
return self.loop.run_until_complete(self._call_search_tool(query, max_results, min_likes))
|
|
except Exception as e:
|
|
return f"❌ Search failed: {str(e)}"
|
|
|
|
def spawn_mcp_npcs(query: str, max_results: int = 5) -> str:
|
|
"""Search and automatically spawn MCP servers as NPCs."""
|
|
print(f"[DEBUG SearchHF] spawn_mcp_npcs called with query: '{query}', max_results: {max_results}")
|
|
if not query.strip():
|
|
return "❌ Please enter a search query"
|
|
|
|
if not self.connected:
|
|
connect_result = self.connect_to_mcp()
|
|
if not self.connected:
|
|
return f"❌ Connection failed: {connect_result}"
|
|
|
|
try:
|
|
|
|
self.logger.info(f"[SearchHF Spawn UI] ========== UI SPAWN STARTING ==========")
|
|
self.logger.info(f"[SearchHF Spawn UI] Query: '{query}', Max results: {max_results}")
|
|
self.logger.info(f"[SearchHF Spawn UI] Connection status: {self.connected}")
|
|
self.logger.info(f"[SearchHF Spawn UI] Available tools: {[tool.name for tool in self.tools]}")
|
|
|
|
self.logger.info(f"[SearchHF Spawn UI] About to call _call_search_tool_raw...")
|
|
result_data = self.loop.run_until_complete(self._call_search_tool_raw(query, max_results))
|
|
self.logger.info(f"[SearchHF Spawn UI] _call_search_tool_raw completed")
|
|
|
|
|
|
self.logger.info(f"[SearchHF Spawn UI] Received result_data type: {type(result_data)}")
|
|
self.logger.info(f"[SearchHF Spawn UI] Result data keys: {list(result_data.keys()) if isinstance(result_data, dict) else 'Not a dict'}")
|
|
self.logger.info(f"[SearchHF Spawn UI] Result data: {result_data}")
|
|
|
|
|
|
if not isinstance(result_data, dict):
|
|
self.logger.error(f"[SearchHF Spawn UI] Expected dict from _call_search_tool_raw, got {type(result_data)}: {result_data}")
|
|
return f"❌ Invalid response format from search tool: {type(result_data)}"
|
|
|
|
if result_data.get("status") == "success":
|
|
spaces = result_data.get("results", [])[:max_results]
|
|
self.logger.info(f"[SearchHF Spawn UI] Found {len(spaces)} spaces to spawn")
|
|
|
|
if not spaces:
|
|
return "❌ No MCP spaces found in search results"
|
|
|
|
self.logger.info(f"[SearchHF Spawn UI] About to call register_mcp_results_as_npcs with {len(spaces)} spaces")
|
|
spawned_npcs = register_mcp_results_as_npcs(spaces)
|
|
self.logger.info(f"[SearchHF Spawn UI] register_mcp_results_as_npcs returned: {spawned_npcs}")
|
|
|
|
if spawned_npcs:
|
|
return f"✅ Spawned {len(spawned_npcs)} MCP NPCs from search: '{query}'\nNPCs: {', '.join(spawned_npcs)}"
|
|
else:
|
|
return f"❌ Failed to spawn NPCs. Found {len(spaces)} spaces but none were successfully created."
|
|
else:
|
|
error_msg = result_data.get('message', 'Unknown error')
|
|
self.logger.error(f"[SearchHF Spawn UI] Search failed: {error_msg}")
|
|
return f"❌ Search failed: {error_msg}"
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"[SearchHF Spawn UI] Exception during spawning: {e}")
|
|
import traceback
|
|
self.logger.error(f"[SearchHF Spawn UI] Traceback: {traceback.format_exc()}")
|
|
return f"❌ Error spawning NPCs: {str(e)}"
|
|
|
|
def list_spawned_npcs() -> str:
|
|
"""List all currently spawned MCP NPCs."""
|
|
active_npcs = list_active_mcp_npcs()
|
|
if not active_npcs:
|
|
return "🔌 No active MCP NPCs found"
|
|
|
|
npc_info = ["🔌 **Active MCP NPCs:**\n"]
|
|
for npc_id, info in active_npcs.items():
|
|
npc_info.append(f"• **{info.get('name', npc_id)}** - {info.get('description', 'No description')}")
|
|
|
|
return "\n".join(npc_info)
|
|
|
|
def remove_spawned_npc(npc_name: str) -> str:
|
|
"""Remove a spawned MCP NPC by name."""
|
|
if not npc_name.strip():
|
|
return "❌ Please enter an NPC name to remove"
|
|
|
|
|
|
npc_id = npc_name.lower().replace(" ", "_")
|
|
|
|
if remove_mcp_npc(npc_id):
|
|
return f"✅ Removed MCP NPC: {npc_name}"
|
|
else:
|
|
return f"❌ Could not find MCP NPC: {npc_name}"
|
|
|
|
def clear_all_spawned_npcs() -> str:
|
|
"""Remove all spawned MCP NPCs."""
|
|
count = clear_all_mcp_npcs()
|
|
if count > 0:
|
|
return f"✅ Removed {count} MCP NPCs from the game world"
|
|
else:
|
|
return "ℹ️ No MCP NPCs to remove"
|
|
|
|
def connect_to_server() -> str:
|
|
"""Connect to the MCP server."""
|
|
try:
|
|
return self.connect_to_mcp()
|
|
except Exception as e:
|
|
return f"❌ Connection error: {str(e)}"
|
|
|
|
def get_status() -> str:
|
|
"""Get current connection status."""
|
|
status = "🟢 Connected" if self.connected else "🔴 Disconnected"
|
|
return f"""**SearchHF Oracle Status:**
|
|
- Connection: {status}
|
|
- MCP Server: {self.mcp_server_url}
|
|
- Available Tools: {len(self.tools)}
|
|
- Tools: {', '.join([tool.name for tool in self.tools]) if self.tools else 'None'}"""
|
|
|
|
def list_tools() -> str:
|
|
"""List available MCP tools."""
|
|
if not self.tools:
|
|
return "❌ No tools available. Try connecting first."
|
|
|
|
tools_info = ["**Available SearchHF Tools:**"]
|
|
for tool in self.tools:
|
|
tools_info.append(f"• **{tool.name}**: {getattr(tool, 'description', 'No description')}")
|
|
|
|
return "\n".join(tools_info)
|
|
|
|
|
|
with gr.Blocks(title=f"{self.character} {self.name}") as interface:
|
|
gr.Markdown(f"""
|
|
# {self.character} {self.name}
|
|
|
|
Advanced Hugging Face search using specialized MCP integration.
|
|
Search models, datasets, papers, and spaces with enhanced capabilities.
|
|
|
|
**MCP Server:** `{self.mcp_server_url}`
|
|
""")
|
|
|
|
with gr.Tab("🔍 Search"):
|
|
with gr.Row():
|
|
query_input = gr.Textbox(
|
|
label="Search Query",
|
|
placeholder="e.g., sentiment analysis, weather oracle, image generation",
|
|
scale=3
|
|
)
|
|
search_btn = gr.Button("🔍 Search", variant="primary", scale=1)
|
|
|
|
with gr.Row():
|
|
max_results = gr.Slider(1, 20, 10, label="Max Results")
|
|
min_likes = gr.Slider(0, 50, 0, label="Min Likes")
|
|
|
|
search_output = gr.Textbox(
|
|
label="Search Results",
|
|
lines=15,
|
|
interactive=False
|
|
)
|
|
|
|
with gr.Tab("🎮 MCP NPC Manager"):
|
|
with gr.Row():
|
|
spawn_query = gr.Textbox(
|
|
label="Search & Spawn Query",
|
|
placeholder="Search for MCP servers to spawn as NPCs",
|
|
scale=3
|
|
)
|
|
spawn_btn = gr.Button("🎮 Spawn NPCs", variant="primary", scale=1)
|
|
|
|
spawn_results = gr.Slider(1, 10, 5, label="Max NPCs to Spawn")
|
|
spawn_output = gr.Textbox(label="Spawn Results", lines=3, interactive=False)
|
|
|
|
gr.Markdown("### Manage Spawned NPCs")
|
|
with gr.Row():
|
|
list_btn = gr.Button("📋 List NPCs")
|
|
clear_btn = gr.Button("🗑️ Clear All", variant="secondary")
|
|
|
|
with gr.Row():
|
|
remove_input = gr.Textbox(label="Remove NPC by Name", scale=3)
|
|
remove_btn = gr.Button("🗑️ Remove", scale=1)
|
|
|
|
npc_output = gr.Textbox(label="NPC Management", lines=10, interactive=False)
|
|
|
|
with gr.Tab("🔧 Connection"):
|
|
with gr.Row():
|
|
connect_btn = gr.Button("🔗 Connect", variant="primary")
|
|
status_btn = gr.Button("📊 Status")
|
|
tools_btn = gr.Button("🛠️ List Tools")
|
|
|
|
connection_output = gr.Textbox(
|
|
label="Connection Status",
|
|
lines=8,
|
|
interactive=False
|
|
)
|
|
|
|
with gr.Tab("ℹ️ Help"):
|
|
gr.Markdown("""
|
|
### 🔍 SearchHF Oracle Help
|
|
|
|
**Search Tab:**
|
|
- Enter queries like "sentiment analysis", "weather oracle", "image generation"
|
|
- Adjust max results and minimum likes filters
|
|
- Results show verified MCP servers with connection details
|
|
|
|
**MCP NPC Manager:**
|
|
- Search and automatically spawn MCP servers as game NPCs
|
|
- Manage spawned NPCs (list, remove, clear all)
|
|
- Each spawned NPC represents a working MCP server
|
|
|
|
**Connection Tab:**
|
|
- Connect to the SearchHF MCP server
|
|
- Check connection status and available tools
|
|
- Troubleshoot connection issues
|
|
|
|
**Commands (via private message):**
|
|
- `searchhf <query>` - Search Hugging Face MCP spaces
|
|
- `spawn_mcp <query>` - Search and spawn MCP servers as NPCs
|
|
- `list_mcp_npcs` - List active MCP NPCs
|
|
- `remove_mcp <name>` - Remove MCP NPC by name
|
|
- `clear_mcp_npcs` - Remove all MCP NPCs
|
|
- `searchhf_connect` - Connect to MCP server
|
|
- `searchhf_status` - Check status
|
|
- `searchhf_tools` - List available tools
|
|
- `searchhf_help` - Show help
|
|
|
|
**Example Commands:**
|
|
- `searchhf weather oracle`
|
|
- `spawn_mcp sentiment analysis`
|
|
- `list_mcp_npcs`
|
|
- `remove_mcp weather_oracle`
|
|
|
|
⚡ **Powered by MCP (Model Context Protocol)**
|
|
""")
|
|
|
|
|
|
search_btn.click(
|
|
search_huggingface,
|
|
inputs=[query_input, max_results, min_likes],
|
|
outputs=[search_output]
|
|
)
|
|
|
|
query_input.submit(
|
|
search_huggingface,
|
|
inputs=[query_input, max_results, min_likes],
|
|
outputs=[search_output]
|
|
)
|
|
|
|
spawn_btn.click(
|
|
spawn_mcp_npcs,
|
|
inputs=[spawn_query, spawn_results],
|
|
outputs=[spawn_output]
|
|
)
|
|
|
|
list_btn.click(
|
|
list_spawned_npcs,
|
|
outputs=[npc_output]
|
|
)
|
|
|
|
clear_btn.click(
|
|
clear_all_spawned_npcs,
|
|
outputs=[npc_output]
|
|
)
|
|
|
|
remove_btn.click(
|
|
remove_spawned_npc,
|
|
inputs=[remove_input],
|
|
outputs=[npc_output] )
|
|
|
|
connect_btn.click(
|
|
connect_to_server,
|
|
outputs=[connection_output]
|
|
)
|
|
status_btn.click(
|
|
get_status,
|
|
outputs=[connection_output]
|
|
)
|
|
|
|
tools_btn.click(
|
|
list_tools,
|
|
outputs=[connection_output]
|
|
)
|
|
|
|
return interface
|
|
|
|
async def _call_search_tool_raw(self, query: str, max_results: int = 10, min_likes: int = 0) -> Dict:
|
|
"""Call the search tool and return raw JSON data for programmatic use."""
|
|
self.logger.info(f"[SearchHF Raw] ========== _call_search_tool_raw STARTING ==========")
|
|
self.logger.info(f"[SearchHF Raw] Query: '{query}', Max results: {max_results}, Min likes: {min_likes}")
|
|
self.logger.info(f"[SearchHF Raw] Connected: {self.connected}")
|
|
|
|
if not self.connected:
|
|
self.logger.info(f"[SearchHF Raw] Not connected, attempting to connect...")
|
|
conn_result = await self._connect()
|
|
self.logger.info(f"[SearchHF Raw] Connection attempt result: {conn_result}")
|
|
if not self.connected:
|
|
self.logger.error(f"[SearchHF Raw] Connection failed: {conn_result}")
|
|
return {"status": "error", "message": f"Connection failed: {conn_result}"}
|
|
|
|
self.logger.info(f"[SearchHF Raw] Available tools: {[tool.name for tool in self.tools]}")
|
|
tool = next((t for t in self.tools if 'search' in t.name.lower()), None)
|
|
if not tool:
|
|
available_tools = [t.name for t in self.tools]
|
|
self.logger.error(f"[SearchHF Raw] SearchHF tool not found. Available tools: {', '.join(available_tools)}")
|
|
return {"status": "error", "message": f"SearchHF tool not found. Available tools: {', '.join(available_tools)}"}
|
|
|
|
self.logger.info(f"[SearchHF Raw] Found search tool: {tool.name}")
|
|
|
|
try:
|
|
|
|
|
|
params = {
|
|
'query': query,
|
|
'max_results': max_results,
|
|
'min_likes': min_likes,
|
|
'author_filter': "",
|
|
'tag_filter': "",
|
|
'sort_by': "verified",
|
|
'created_after': "",
|
|
'include_private': False,
|
|
'verify_mcp': True,
|
|
'min_age_days': 0,
|
|
'max_age_days': 365 }
|
|
|
|
self.logger.info(f"[SearchHF Raw] Calling tool '{tool.name}' with params: {params}")
|
|
result = await self.session.call_tool(tool.name, params)
|
|
self.logger.info(f"[SearchHF Raw] Tool call completed successfully")
|
|
self.logger.info(f"[SearchHF Raw] Result type: {type(result)}")
|
|
self.logger.info(f"[SearchHF Raw] Result hasattr content: {hasattr(result, 'content')}")
|
|
if hasattr(result, 'content'):
|
|
self.logger.info(f"[SearchHF Raw] Result.content type: {type(result.content)}")
|
|
self.logger.info(f"[SearchHF Raw] Result.content: {result.content}")
|
|
|
|
content_text = ""
|
|
if hasattr(result, 'content') and result.content:
|
|
if isinstance(result.content, list):
|
|
for content_item in result.content:
|
|
if hasattr(content_item, 'text'):
|
|
content_text += content_item.text
|
|
elif hasattr(content_item, 'content'):
|
|
content_text += str(content_item.content)
|
|
else:
|
|
content_text += str(content_item)
|
|
elif hasattr(result.content, 'text'):
|
|
content_text = result.content.text
|
|
else:
|
|
content_text = str(result.content)
|
|
|
|
self.logger.info(f"[SearchHF Raw] Extracted content_text length: {len(content_text)}")
|
|
self.logger.info(f"[SearchHF Raw] Content_text empty check: {not content_text or content_text.strip() == ''}")
|
|
if not content_text or content_text.strip() == "":
|
|
self.logger.error(f"[SearchHF Raw] Empty response from MCP server for query: {query}")
|
|
return {"status": "error", "message": "Empty response from SearchHF MCP server"}
|
|
|
|
content_text = content_text.strip()
|
|
self.logger.info(f"[SearchHF Raw] Stripped content_text length: {len(content_text)}")
|
|
|
|
|
|
self.logger.info(f"[SearchHF Raw] Raw content received (first 200 chars): {content_text[:200]}")
|
|
|
|
self.logger.info(f"[SearchHF Raw] About to attempt JSON parsing...")
|
|
try:
|
|
|
|
self.logger.info(f"[SearchHF Raw] Checking if content starts with JSON characters...")
|
|
self.logger.info(f"[SearchHF Raw] Starts with '{{': {content_text.startswith('{')}")
|
|
self.logger.info(f"[SearchHF Raw] Starts with '[': {content_text.startswith('[')}")
|
|
if not content_text.startswith('{') and not content_text.startswith('['):
|
|
self.logger.error(f"[SearchHF Raw] Response doesn't look like JSON: {content_text[:100]}")
|
|
return {"status": "error", "message": f"Invalid JSON response format. Content: {content_text[:100]}"}
|
|
|
|
self.logger.info(f"[SearchHF Raw] Calling json.loads()...")
|
|
result_data = json.loads(content_text)
|
|
self.logger.info(f"[SearchHF Raw] JSON parsing successful!")
|
|
self.logger.info(f"[SearchHF Raw] Parsed data type: {type(result_data)}")
|
|
|
|
|
|
if not isinstance(result_data, dict):
|
|
self.logger.error(f"[SearchHF] Expected dict, got {type(result_data)}")
|
|
return {"status": "error", "message": f"Invalid response format: expected dict, got {type(result_data)}"}
|
|
|
|
|
|
if "status" not in result_data:
|
|
if "results" in result_data:
|
|
result_data["status"] = "success"
|
|
else:
|
|
result_data["status"] = "error"
|
|
result_data["message"] = "Unknown response format"
|
|
|
|
self.logger.info(f"[SearchHF] Successfully parsed JSON with status: {result_data.get('status')}")
|
|
return result_data
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"[SearchHF] JSON decode error: {e}, Raw content: {content_text}")
|
|
|
|
return {
|
|
"status": "error",
|
|
"message": f"JSON decode error: {str(e)}",
|
|
"raw_content": content_text[:200]
|
|
}
|
|
except Exception as e:
|
|
self.logger.error(f"[SearchHF] Unexpected error during JSON parsing: {e}")
|
|
return {
|
|
"status": "error",
|
|
"message": f"Unexpected parsing error: {str(e)}", "raw_content": content_text[:200]
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"[SearchHF] Search error: {e}")
|
|
return {"status": "error", "message": f"Search error: {str(e)}"}
|
|
|
|
async def _call_search_tool(self, query: str, max_results: int = 10, min_likes: int = 0) -> str:
|
|
"""Call the search tool via MCP service and return the formatted result for display."""
|
|
|
|
result_data = await self._call_search_tool_raw(query, max_results, min_likes)
|
|
|
|
|
|
if result_data.get("status") == "error":
|
|
return f"❌ {result_data.get('message', 'Unknown error')}"
|
|
|
|
if result_data.get("status") == "success":
|
|
stats = result_data.get("stats", {})
|
|
results = result_data.get("results", [])
|
|
|
|
formatted_output = f"✅ **SearchHF Results for '{query}'**\n\n"
|
|
formatted_output += f"📊 **Stats:**\n"
|
|
formatted_output += f"• Total spaces searched: {stats.get('total_spaces_searched', 0)}\n"
|
|
formatted_output += f"• Results returned: {stats.get('results_returned', 0)}\n"
|
|
formatted_output += f"• Verified MCP servers: {stats.get('verified_mcp_servers', 0)}\n\n"
|
|
|
|
if results:
|
|
formatted_output += "🔍 **Found MCP Spaces:**\n\n"
|
|
for i, space in enumerate(results[:5], 1):
|
|
formatted_output += f"**{i}. {space.get('title', 'Unknown')}**\n"
|
|
formatted_output += f" • Author: {space.get('author', 'Unknown')}\n"
|
|
formatted_output += f" • Likes: {space.get('likes', 0)}\n"
|
|
formatted_output += f" • MCP URL: {space.get('mcp_server_url', 'N/A')}\n"
|
|
formatted_output += f" • Verified: {'✅' if space.get('mcp_verified') else '❌'}\n"
|
|
formatted_output += f" • Tools: {space.get('mcp_tools_count', 0)}\n"
|
|
formatted_output += f" • Space URL: {space.get('huggingface_url', 'N/A')}\n\n"
|
|
|
|
formatted_output += f"\n📋 **Full JSON Result:**\n```json\n{json.dumps(result_data, indent=2)}\n```"
|
|
return formatted_output
|
|
else:
|
|
return f"❌ Search failed: {result_data.get('message', 'Unknown error')}"
|
|
|
|
|
|
|
|
def auto_register(game_engine) -> bool:
|
|
"""Auto-register the SearchHF Oracle addon with the game engine."""
|
|
try:
|
|
|
|
addon = SearchHFOracleAddon()
|
|
|
|
|
|
if addon.npc_config:
|
|
npc_service = game_engine.get_npc_service()
|
|
npc_service.register_npc(addon.npc_config['id'], addon.npc_config)
|
|
|
|
|
|
if not hasattr(game_engine.get_world(), 'addon_npcs'):
|
|
game_engine.get_world().addon_npcs = {}
|
|
game_engine.get_world().addon_npcs[addon.addon_id] = addon
|
|
|
|
|
|
addon.on_startup()
|
|
|
|
print(f"[SearchHFOracleAddon] Auto-registered successfully as self-contained addon")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"[SearchHFOracleAddon] Error during auto-registration: {e}")
|
|
return False
|
|
|