Chris4K's picture
Upload 90 files
87532e2 verified
"""
SearchHF Oracle Addon - MCP-based Hugging Face search interface with NPC spawning
"""
import asyncio
import logging
import time
import json
from typing import Dict, Any, Optional, List
import gradio as gr
from mcp import ClientSession
from mcp.client.sse import sse_client
from contextlib import AsyncExitStack
try:
# Try relative imports first (for local development)
from ..interfaces.npc_addon import NPCAddon
from .generic_mcp_server_addon import register_mcp_results_as_npcs, list_active_mcp_npcs, remove_mcp_npc, clear_all_mcp_npcs
except ImportError:
# Fall back to absolute imports (for HuggingFace Spaces)
try:
from src.interfaces.npc_addon import NPCAddon
from src.addons.generic_mcp_server_addon import register_mcp_results_as_npcs, list_active_mcp_npcs, remove_mcp_npc, clear_all_mcp_npcs
except ImportError:
# Final fallback - define minimal stubs to prevent import errors
print("[WARNING] Could not import NPC addon dependencies - running in limited mode")
class NPCAddon:
def __init__(self):
pass
@property
def addon_id(self):
return "searchhf_oracle"
@property
def addon_name(self):
return "SearchHF Oracle"
def register_mcp_results_as_npcs(results):
print(f"[WARNING] NPC registration not available - would register {len(results)} NPCs")
return []
def list_active_mcp_npcs():
return {}
def remove_mcp_npc(npc_id):
return False
def clear_all_mcp_npcs():
return 0
class SearchHFOracleAddon(NPCAddon):
"""SearchHF Oracle Addon for searching Hugging Face using MCP and spawning NPCs."""
def __init__(self):
# Initialize properties first
self.name = "SearchHF Oracle"
self.description = "Advanced Hugging Face search using specialized MCP server. Search and add any HF MCP to the game world."
self.version = "1.0.0"
self.author = "MMOP Team"
# NPC characteristics
self.character = "🔍"
self.position = (200, 100) # Position on the game map
self.npc_name = "SearchHF Oracle"
# MCP Configuration
self.mcp_server_url = "https://chris4k-searchhfformcp.hf.space/gradio_api/mcp/sse"
self.connected = False
self.last_connection_attempt = 0
self.connection_cooldown = 30 # 30 seconds between connection attempts
# Available tools
self.available_tools = []
# Logger
self.logger = logging.getLogger(__name__)
# Initialize parent (which will auto-register)
super().__init__()
# MCP client components
try:
self.loop = asyncio.get_event_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.session = None
self.exit_stack = None
self.tools = []
@property
def addon_id(self) -> str:
"""Unique identifier for this add-on"""
return "searchhf_oracle"
@property
def addon_name(self) -> str:
"""Display name for this add-on"""
return "SearchHF Oracle"
@property
def npc_config(self) -> Dict:
"""NPC configuration for auto-placement in world"""
return {
'id': 'searchhf_oracle',
'name': self.npc_name,
'x': self.position[0],
'y': self.position[1],
'char': self.character,
'type': 'oracle',
'personality': 'searchhf',
'description': self.description
}
@property
def ui_tab_name(self) -> str:
"""UI tab name for this addon"""
return "SearchHF Oracle"
def handle_command(self, player_id: str, command: str) -> str:
"""Handle player commands via private messages"""
try:
if command.startswith("searchhf "):
query = command[9:].strip()
if not query:
return "❌ Please provide a search query (e.g., 'searchhf sentiment analysis')"
result = self.loop.run_until_complete(self._call_search_tool(query))
return result
elif command.startswith("spawn_mcp "):
query = command[10:].strip()
if not query:
return "❌ Please provide a search query for MCP spawning"
# Get search results and spawn NPCs
try:
# Use raw method to get JSON data directly
self.logger.info(f"[SearchHF Spawn] ========== STARTING SPAWN PROCESS ==========")
self.logger.info(f"[SearchHF Spawn] Query: '{query}'")
self.logger.info(f"[SearchHF Spawn] Connected status: {self.connected}")
self.logger.info(f"[SearchHF Spawn] Available tools: {[tool.name for tool in self.tools]}")
self.logger.info(f"[SearchHF Spawn] About to call _call_search_tool_raw...")
result_data = self.loop.run_until_complete(self._call_search_tool_raw(query))
self.logger.info(f"[SearchHF Spawn] _call_search_tool_raw completed")
self.logger.info(f"[SearchHF Spawn] Received result_data type: {type(result_data)}")
self.logger.info(f"[SearchHF Spawn] Received result_data keys: {list(result_data.keys()) if isinstance(result_data, dict) else 'Not a dict'}")
self.logger.info(f"[SearchHF Spawn] Received result_data: {result_data}")
# Additional safety check - ensure result_data is a dict
if not isinstance(result_data, dict):
self.logger.error(f"[SearchHF Spawn] Expected dict, got {type(result_data)}: {result_data}")
return f"❌ Invalid response format from search: expected dict, got {type(result_data)}"
if result_data.get("status") == "success":
spaces = result_data.get("results", [])[:5] # Limit to 5 spawns
self.logger.info(f"[SearchHF Spawn] Found {len(spaces)} spaces")
if spaces:
self.logger.info(f"[SearchHF Spawn] First space data: {spaces[0]}")
else:
self.logger.warning(f"[SearchHF Spawn] No spaces found in results")
if not spaces:
return "❌ No MCP spaces found in search results"
# Validate that spaces have the required fields
valid_spaces = []
for i, space in enumerate(spaces):
if isinstance(space, dict) and space.get("mcp_server_url"):
valid_spaces.append(space)
else:
self.logger.warning(f"[SearchHF Spawn] Space {i} is invalid or missing mcp_server_url: {space}")
if not valid_spaces:
return f"❌ Found {len(spaces)} spaces but none have valid MCP server URLs"
self.logger.info(f"[SearchHF Spawn] {len(valid_spaces)} valid spaces ready for spawning")
spawned_npcs = register_mcp_results_as_npcs(valid_spaces)
self.logger.info(f"[SearchHF Spawn] register_mcp_results_as_npcs returned: {spawned_npcs}")
if spawned_npcs:
return f"✅ Spawned {len(spawned_npcs)} MCP NPCs from search: '{query}'\nNPCs: {', '.join(spawned_npcs)}"
else:
return f"❌ Failed to spawn NPCs. Found {len(valid_spaces)} valid spaces but none were successfully created."
else:
error_msg = result_data.get('message', 'Unknown error')
self.logger.error(f"[SearchHF Spawn] Search failed: {error_msg}")
return f"❌ Search failed: {error_msg}"
except Exception as e:
self.logger.error(f"[SearchHF Spawn] Exception during MCP spawning: {e}")
import traceback
traceback.print_exc()
return f"❌ Error during MCP spawning: {str(e)}"
elif command == "list_mcp_npcs":
active_npcs = list_active_mcp_npcs()
if not active_npcs:
return "🔌 No active MCP NPCs found"
npc_info = ["🔌 **Active MCP NPCs:**"]
for npc_id, info in active_npcs.items():
npc_info.append(f"• {info.get('name', npc_id)} - {info.get('description', 'No description')}")
return "\n".join(npc_info)
elif command.startswith("remove_mcp "):
npc_name = command[11:].strip()
npc_id = npc_name.lower().replace(" ", "_")
if remove_mcp_npc(npc_id):
return f"✅ Removed MCP NPC: {npc_name}"
else:
return f"❌ Could not find MCP NPC: {npc_name}"
elif command == "clear_mcp_npcs":
count = clear_all_mcp_npcs()
return f"✅ Removed {count} MCP NPCs from the game world"
elif command == "searchhf_connect":
result = self.connect_to_mcp()
if self.connected:
return f"✅ Connected to SearchHF MCP server with {len(self.tools)} tools"
else:
return f"❌ Connection failed: {result}"
elif command == "searchhf_status":
status = "🟢 Connected" if self.connected else "🔴 Disconnected"
return f"SearchHF Oracle Status: {status} | Tools: {len(self.tools)}"
elif command == "searchhf_tools":
if not self.tools:
return "❌ No tools available. Try connecting first with 'searchhf_connect'"
tools_list = [f"- {tool.name}: {getattr(tool, 'description', 'No description')}"
for tool in self.tools]
return f"🛠️ **Available SearchHF Tools:**\n" + "\n".join(tools_list)
elif command == "searchhf_help":
return """🔍 **SearchHF Oracle Help**
**Commands:**
- searchhf <query> - Search Hugging Face MCP spaces
- spawn_mcp <query> - Search and spawn MCP servers as NPCs
- list_mcp_npcs - List active MCP NPCs
- remove_mcp <name> - Remove MCP NPC by name
- clear_mcp_npcs - Remove all MCP NPCs
- searchhf_connect - Connect to MCP server
- searchhf_status - Check status
- searchhf_tools - List available tools
- searchhf_help - Show this help
**Examples:**
• searchhf sentiment analysis
• searchhf weather oracle
• spawn_mcp image generation
• list_mcp_npcs
• remove_mcp weather_oracle
⚡ **Powered by MCP (Model Context Protocol)**"""
return "❓ Unknown command. Use 'searchhf_help' for available commands."
except Exception as e:
self.logger.error(f"[{self.name}] Error handling command {command}: {e}")
return f"❌ Error processing command: {str(e)}"
def on_startup(self):
"""Called when the addon is loaded during game startup"""
try:
# Connect to MCP server and fetch tools
result = self.connect_to_mcp()
self.logger.info(f"[{self.name}] Startup connection: {result}")
except Exception as e:
self.logger.error(f"[{self.name}] Error during startup: {e}")
self.connected = False
def connect_to_mcp(self) -> str:
"""Synchronous connect to the SearchHF MCP server."""
current_time = time.time()
if current_time - self.last_connection_attempt < self.connection_cooldown:
return "⏳ Please wait before retrying connection..."
self.last_connection_attempt = current_time
try:
return self.loop.run_until_complete(self._connect())
except Exception as e:
self.connected = False
return f"❌ Connection failed: {e}"
async def _connect(self) -> str:
"""Async MCP connection using SSE."""
try:
# Clean up previous connection
if self.exit_stack:
await self.exit_stack.aclose()
self.exit_stack = AsyncExitStack()
# Connect to SSE MCP server
sse_transport = await self.exit_stack.enter_async_context(
sse_client(self.mcp_server_url)
)
read_stream, write_callable = sse_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(read_stream, write_callable)
)
await self.session.initialize()
# Get available tools
response = await self.session.list_tools()
self.tools = response.tools
self.connected = True
tool_names = [tool.name for tool in self.tools]
return f"✅ Connected to SearchHF MCP server!\nAvailable tools: {', '.join(tool_names)}"
except Exception as e:
self.connected = False
return f"❌ Connection failed: {str(e)}"
def get_interface(self) -> gr.Interface:
"""Create the Gradio interface for the SearchHF addon."""
def search_huggingface(query: str, max_results: int = 10, min_likes: int = 0) -> str:
"""Search Hugging Face using the MCP server."""
if not query.strip():
return "❌ Please enter a search query"
if not self.connected:
connect_result = self.connect_to_mcp()
if not self.connected:
return f"❌ Connection failed: {connect_result}"
try:
return self.loop.run_until_complete(self._call_search_tool(query, max_results, min_likes))
except Exception as e:
return f"❌ Search failed: {str(e)}"
def spawn_mcp_npcs(query: str, max_results: int = 5) -> str:
"""Search and automatically spawn MCP servers as NPCs."""
print(f"[DEBUG SearchHF] spawn_mcp_npcs called with query: '{query}', max_results: {max_results}")
if not query.strip():
return "❌ Please enter a search query"
if not self.connected:
connect_result = self.connect_to_mcp()
if not self.connected:
return f"❌ Connection failed: {connect_result}"
try:
# Get raw JSON data instead of formatted string
self.logger.info(f"[SearchHF Spawn UI] ========== UI SPAWN STARTING ==========")
self.logger.info(f"[SearchHF Spawn UI] Query: '{query}', Max results: {max_results}")
self.logger.info(f"[SearchHF Spawn UI] Connection status: {self.connected}")
self.logger.info(f"[SearchHF Spawn UI] Available tools: {[tool.name for tool in self.tools]}")
self.logger.info(f"[SearchHF Spawn UI] About to call _call_search_tool_raw...")
result_data = self.loop.run_until_complete(self._call_search_tool_raw(query, max_results))
self.logger.info(f"[SearchHF Spawn UI] _call_search_tool_raw completed")
# Debug: Log what we received
self.logger.info(f"[SearchHF Spawn UI] Received result_data type: {type(result_data)}")
self.logger.info(f"[SearchHF Spawn UI] Result data keys: {list(result_data.keys()) if isinstance(result_data, dict) else 'Not a dict'}")
self.logger.info(f"[SearchHF Spawn UI] Result data: {result_data}")
# Additional check to make sure we got a dict
if not isinstance(result_data, dict):
self.logger.error(f"[SearchHF Spawn UI] Expected dict from _call_search_tool_raw, got {type(result_data)}: {result_data}")
return f"❌ Invalid response format from search tool: {type(result_data)}"
if result_data.get("status") == "success":
spaces = result_data.get("results", [])[:max_results]
self.logger.info(f"[SearchHF Spawn UI] Found {len(spaces)} spaces to spawn")
if not spaces:
return "❌ No MCP spaces found in search results"
self.logger.info(f"[SearchHF Spawn UI] About to call register_mcp_results_as_npcs with {len(spaces)} spaces")
spawned_npcs = register_mcp_results_as_npcs(spaces)
self.logger.info(f"[SearchHF Spawn UI] register_mcp_results_as_npcs returned: {spawned_npcs}")
if spawned_npcs:
return f"✅ Spawned {len(spawned_npcs)} MCP NPCs from search: '{query}'\nNPCs: {', '.join(spawned_npcs)}"
else:
return f"❌ Failed to spawn NPCs. Found {len(spaces)} spaces but none were successfully created."
else:
error_msg = result_data.get('message', 'Unknown error')
self.logger.error(f"[SearchHF Spawn UI] Search failed: {error_msg}")
return f"❌ Search failed: {error_msg}"
except Exception as e:
self.logger.error(f"[SearchHF Spawn UI] Exception during spawning: {e}")
import traceback
self.logger.error(f"[SearchHF Spawn UI] Traceback: {traceback.format_exc()}")
return f"❌ Error spawning NPCs: {str(e)}"
def list_spawned_npcs() -> str:
"""List all currently spawned MCP NPCs."""
active_npcs = list_active_mcp_npcs()
if not active_npcs:
return "🔌 No active MCP NPCs found"
npc_info = ["🔌 **Active MCP NPCs:**\n"]
for npc_id, info in active_npcs.items():
npc_info.append(f"• **{info.get('name', npc_id)}** - {info.get('description', 'No description')}")
return "\n".join(npc_info)
def remove_spawned_npc(npc_name: str) -> str:
"""Remove a spawned MCP NPC by name."""
if not npc_name.strip():
return "❌ Please enter an NPC name to remove"
# Convert name to addon_id format
npc_id = npc_name.lower().replace(" ", "_")
if remove_mcp_npc(npc_id):
return f"✅ Removed MCP NPC: {npc_name}"
else:
return f"❌ Could not find MCP NPC: {npc_name}"
def clear_all_spawned_npcs() -> str:
"""Remove all spawned MCP NPCs."""
count = clear_all_mcp_npcs()
if count > 0:
return f"✅ Removed {count} MCP NPCs from the game world"
else:
return "ℹ️ No MCP NPCs to remove"
def connect_to_server() -> str:
"""Connect to the MCP server."""
try:
return self.connect_to_mcp()
except Exception as e:
return f"❌ Connection error: {str(e)}"
def get_status() -> str:
"""Get current connection status."""
status = "🟢 Connected" if self.connected else "🔴 Disconnected"
return f"""**SearchHF Oracle Status:**
- Connection: {status}
- MCP Server: {self.mcp_server_url}
- Available Tools: {len(self.tools)}
- Tools: {', '.join([tool.name for tool in self.tools]) if self.tools else 'None'}"""
def list_tools() -> str:
"""List available MCP tools."""
if not self.tools:
return "❌ No tools available. Try connecting first."
tools_info = ["**Available SearchHF Tools:**"]
for tool in self.tools:
tools_info.append(f"• **{tool.name}**: {getattr(tool, 'description', 'No description')}")
return "\n".join(tools_info)
# Create the interface
with gr.Blocks(title=f"{self.character} {self.name}") as interface:
gr.Markdown(f"""
# {self.character} {self.name}
Advanced Hugging Face search using specialized MCP integration.
Search models, datasets, papers, and spaces with enhanced capabilities.
**MCP Server:** `{self.mcp_server_url}`
""")
with gr.Tab("🔍 Search"):
with gr.Row():
query_input = gr.Textbox(
label="Search Query",
placeholder="e.g., sentiment analysis, weather oracle, image generation",
scale=3
)
search_btn = gr.Button("🔍 Search", variant="primary", scale=1)
with gr.Row():
max_results = gr.Slider(1, 20, 10, label="Max Results")
min_likes = gr.Slider(0, 50, 0, label="Min Likes")
search_output = gr.Textbox(
label="Search Results",
lines=15,
interactive=False
)
with gr.Tab("🎮 MCP NPC Manager"):
with gr.Row():
spawn_query = gr.Textbox(
label="Search & Spawn Query",
placeholder="Search for MCP servers to spawn as NPCs",
scale=3
)
spawn_btn = gr.Button("🎮 Spawn NPCs", variant="primary", scale=1)
spawn_results = gr.Slider(1, 10, 5, label="Max NPCs to Spawn")
spawn_output = gr.Textbox(label="Spawn Results", lines=3, interactive=False)
gr.Markdown("### Manage Spawned NPCs")
with gr.Row():
list_btn = gr.Button("📋 List NPCs")
clear_btn = gr.Button("🗑️ Clear All", variant="secondary")
with gr.Row():
remove_input = gr.Textbox(label="Remove NPC by Name", scale=3)
remove_btn = gr.Button("🗑️ Remove", scale=1)
npc_output = gr.Textbox(label="NPC Management", lines=10, interactive=False)
with gr.Tab("🔧 Connection"):
with gr.Row():
connect_btn = gr.Button("🔗 Connect", variant="primary")
status_btn = gr.Button("📊 Status")
tools_btn = gr.Button("🛠️ List Tools")
connection_output = gr.Textbox(
label="Connection Status",
lines=8,
interactive=False
)
with gr.Tab("ℹ️ Help"):
gr.Markdown("""
### 🔍 SearchHF Oracle Help
**Search Tab:**
- Enter queries like "sentiment analysis", "weather oracle", "image generation"
- Adjust max results and minimum likes filters
- Results show verified MCP servers with connection details
**MCP NPC Manager:**
- Search and automatically spawn MCP servers as game NPCs
- Manage spawned NPCs (list, remove, clear all)
- Each spawned NPC represents a working MCP server
**Connection Tab:**
- Connect to the SearchHF MCP server
- Check connection status and available tools
- Troubleshoot connection issues
**Commands (via private message):**
- `searchhf <query>` - Search Hugging Face MCP spaces
- `spawn_mcp <query>` - Search and spawn MCP servers as NPCs
- `list_mcp_npcs` - List active MCP NPCs
- `remove_mcp <name>` - Remove MCP NPC by name
- `clear_mcp_npcs` - Remove all MCP NPCs
- `searchhf_connect` - Connect to MCP server
- `searchhf_status` - Check status
- `searchhf_tools` - List available tools
- `searchhf_help` - Show help
**Example Commands:**
- `searchhf weather oracle`
- `spawn_mcp sentiment analysis`
- `list_mcp_npcs`
- `remove_mcp weather_oracle`
⚡ **Powered by MCP (Model Context Protocol)**
""")
# Wire up events
search_btn.click(
search_huggingface,
inputs=[query_input, max_results, min_likes],
outputs=[search_output]
)
query_input.submit(
search_huggingface,
inputs=[query_input, max_results, min_likes],
outputs=[search_output]
)
spawn_btn.click(
spawn_mcp_npcs,
inputs=[spawn_query, spawn_results],
outputs=[spawn_output]
)
list_btn.click(
list_spawned_npcs,
outputs=[npc_output]
)
clear_btn.click(
clear_all_spawned_npcs,
outputs=[npc_output]
)
remove_btn.click(
remove_spawned_npc,
inputs=[remove_input],
outputs=[npc_output] )
connect_btn.click(
connect_to_server,
outputs=[connection_output]
)
status_btn.click(
get_status,
outputs=[connection_output]
)
tools_btn.click(
list_tools,
outputs=[connection_output]
)
return interface
async def _call_search_tool_raw(self, query: str, max_results: int = 10, min_likes: int = 0) -> Dict:
"""Call the search tool and return raw JSON data for programmatic use."""
self.logger.info(f"[SearchHF Raw] ========== _call_search_tool_raw STARTING ==========")
self.logger.info(f"[SearchHF Raw] Query: '{query}', Max results: {max_results}, Min likes: {min_likes}")
self.logger.info(f"[SearchHF Raw] Connected: {self.connected}")
if not self.connected:
self.logger.info(f"[SearchHF Raw] Not connected, attempting to connect...")
conn_result = await self._connect()
self.logger.info(f"[SearchHF Raw] Connection attempt result: {conn_result}")
if not self.connected:
self.logger.error(f"[SearchHF Raw] Connection failed: {conn_result}")
return {"status": "error", "message": f"Connection failed: {conn_result}"}
# Find search tool
self.logger.info(f"[SearchHF Raw] Available tools: {[tool.name for tool in self.tools]}")
tool = next((t for t in self.tools if 'search' in t.name.lower()), None)
if not tool:
available_tools = [t.name for t in self.tools]
self.logger.error(f"[SearchHF Raw] SearchHF tool not found. Available tools: {', '.join(available_tools)}")
return {"status": "error", "message": f"SearchHF tool not found. Available tools: {', '.join(available_tools)}"}
self.logger.info(f"[SearchHF Raw] Found search tool: {tool.name}")
try:
# Call the tool with all required parameters based on the MCP server function signature
#TODO : expose all params
params = {
'query': query,
'max_results': max_results,
'min_likes': min_likes,
'author_filter': "",
'tag_filter': "",
'sort_by': "verified",
'created_after': "",
'include_private': False,
'verify_mcp': True,
'min_age_days': 0,
'max_age_days': 365 }
self.logger.info(f"[SearchHF Raw] Calling tool '{tool.name}' with params: {params}")
result = await self.session.call_tool(tool.name, params)
self.logger.info(f"[SearchHF Raw] Tool call completed successfully")
self.logger.info(f"[SearchHF Raw] Result type: {type(result)}")
self.logger.info(f"[SearchHF Raw] Result hasattr content: {hasattr(result, 'content')}")
if hasattr(result, 'content'):
self.logger.info(f"[SearchHF Raw] Result.content type: {type(result.content)}")
self.logger.info(f"[SearchHF Raw] Result.content: {result.content}")
# Extract content properly (same as weather addon)
content_text = ""
if hasattr(result, 'content') and result.content:
if isinstance(result.content, list):
for content_item in result.content:
if hasattr(content_item, 'text'):
content_text += content_item.text
elif hasattr(content_item, 'content'):
content_text += str(content_item.content)
else:
content_text += str(content_item)
elif hasattr(result.content, 'text'):
content_text = result.content.text
else:
content_text = str(result.content)
# Enhanced error handling for empty or malformed responses
self.logger.info(f"[SearchHF Raw] Extracted content_text length: {len(content_text)}")
self.logger.info(f"[SearchHF Raw] Content_text empty check: {not content_text or content_text.strip() == ''}")
if not content_text or content_text.strip() == "":
self.logger.error(f"[SearchHF Raw] Empty response from MCP server for query: {query}")
return {"status": "error", "message": "Empty response from SearchHF MCP server"}
content_text = content_text.strip()
self.logger.info(f"[SearchHF Raw] Stripped content_text length: {len(content_text)}")
# Debug: Log the raw content
self.logger.info(f"[SearchHF Raw] Raw content received (first 200 chars): {content_text[:200]}")
# Try to parse the JSON result with enhanced error handling
self.logger.info(f"[SearchHF Raw] About to attempt JSON parsing...")
try:
# Handle potential cases where the response might not be valid JSON
self.logger.info(f"[SearchHF Raw] Checking if content starts with JSON characters...")
self.logger.info(f"[SearchHF Raw] Starts with '{{': {content_text.startswith('{')}")
self.logger.info(f"[SearchHF Raw] Starts with '[': {content_text.startswith('[')}")
if not content_text.startswith('{') and not content_text.startswith('['):
self.logger.error(f"[SearchHF Raw] Response doesn't look like JSON: {content_text[:100]}")
return {"status": "error", "message": f"Invalid JSON response format. Content: {content_text[:100]}"}
self.logger.info(f"[SearchHF Raw] Calling json.loads()...")
result_data = json.loads(content_text)
self.logger.info(f"[SearchHF Raw] JSON parsing successful!")
self.logger.info(f"[SearchHF Raw] Parsed data type: {type(result_data)}")
# Validate that the result has the expected structure
if not isinstance(result_data, dict):
self.logger.error(f"[SearchHF] Expected dict, got {type(result_data)}")
return {"status": "error", "message": f"Invalid response format: expected dict, got {type(result_data)}"}
# If no status is present, assume success if results are there
if "status" not in result_data:
if "results" in result_data:
result_data["status"] = "success"
else:
result_data["status"] = "error"
result_data["message"] = "Unknown response format"
self.logger.info(f"[SearchHF] Successfully parsed JSON with status: {result_data.get('status')}")
return result_data
except json.JSONDecodeError as e:
self.logger.error(f"[SearchHF] JSON decode error: {e}, Raw content: {content_text}")
# Return a structured error response instead of trying to parse invalid JSON
return {
"status": "error",
"message": f"JSON decode error: {str(e)}",
"raw_content": content_text[:200]
}
except Exception as e:
self.logger.error(f"[SearchHF] Unexpected error during JSON parsing: {e}")
return {
"status": "error",
"message": f"Unexpected parsing error: {str(e)}", "raw_content": content_text[:200]
}
except Exception as e:
self.logger.error(f"[SearchHF] Search error: {e}")
return {"status": "error", "message": f"Search error: {str(e)}"}
async def _call_search_tool(self, query: str, max_results: int = 10, min_likes: int = 0) -> str:
"""Call the search tool via MCP service and return the formatted result for display."""
# Get raw data
result_data = await self._call_search_tool_raw(query, max_results, min_likes)
# Format for display
if result_data.get("status") == "error":
return f"❌ {result_data.get('message', 'Unknown error')}"
if result_data.get("status") == "success":
stats = result_data.get("stats", {})
results = result_data.get("results", [])
formatted_output = f"✅ **SearchHF Results for '{query}'**\n\n"
formatted_output += f"📊 **Stats:**\n"
formatted_output += f"• Total spaces searched: {stats.get('total_spaces_searched', 0)}\n"
formatted_output += f"• Results returned: {stats.get('results_returned', 0)}\n"
formatted_output += f"• Verified MCP servers: {stats.get('verified_mcp_servers', 0)}\n\n"
if results:
formatted_output += "🔍 **Found MCP Spaces:**\n\n"
for i, space in enumerate(results[:5], 1): # Show top 5 results
formatted_output += f"**{i}. {space.get('title', 'Unknown')}**\n"
formatted_output += f" • Author: {space.get('author', 'Unknown')}\n"
formatted_output += f" • Likes: {space.get('likes', 0)}\n"
formatted_output += f" • MCP URL: {space.get('mcp_server_url', 'N/A')}\n"
formatted_output += f" • Verified: {'✅' if space.get('mcp_verified') else '❌'}\n"
formatted_output += f" • Tools: {space.get('mcp_tools_count', 0)}\n"
formatted_output += f" • Space URL: {space.get('huggingface_url', 'N/A')}\n\n"
formatted_output += f"\n📋 **Full JSON Result:**\n```json\n{json.dumps(result_data, indent=2)}\n```"
return formatted_output
else:
return f"❌ Search failed: {result_data.get('message', 'Unknown error')}"
# Auto-registration function for the game engine
def auto_register(game_engine) -> bool:
"""Auto-register the SearchHF Oracle addon with the game engine."""
try:
# Create addon instance (this will auto-register via NPCAddon)
addon = SearchHFOracleAddon()
# Register NPC if needed
if addon.npc_config:
npc_service = game_engine.get_npc_service()
npc_service.register_npc(addon.npc_config['id'], addon.npc_config)
# Add to addon NPCs for command handling
if not hasattr(game_engine.get_world(), 'addon_npcs'):
game_engine.get_world().addon_npcs = {}
game_engine.get_world().addon_npcs[addon.addon_id] = addon
# Call startup
addon.on_startup()
print(f"[SearchHFOracleAddon] Auto-registered successfully as self-contained addon")
return True
except Exception as e:
print(f"[SearchHFOracleAddon] Error during auto-registration: {e}")
return False