Chris4K's picture
Upload 195 files
4c75d73 verified
"""
MCP (Model Context Protocol) service implementation.
"""
from typing import Dict, Optional, Any
from ..interfaces.service_interfaces import IMCPService
from ..services.player_service import PlayerService
from ..services.chat_service import ChatService
class MCPService(IMCPService):
"""Service for handling MCP integration and AI agent management."""
def __init__(self, player_service: PlayerService, chat_service: ChatService):
self.player_service = player_service
self.chat_service = chat_service
self.ai_agents: Dict[str, str] = {} # agent_id -> player_id mapping
def register_ai_agent(self, agent_id: str, name: str, agent_type: str = "ai_agent") -> str:
"""Register AI agent via MCP and create corresponding player."""
try:
# Create player for the AI agent
player = self.player_service.create_player(name, agent_type)
player.ai_agent_id = agent_id
# Add player to world
success = self.player_service.add_player_to_world(player)
if success:
# Map agent_id to player_id
self.ai_agents[agent_id] = player.id
# Send welcome message
self.chat_service.add_system_message(
f"🤖 AI Agent '{name}' has joined the game!"
)
return player.id
else:
return ""
except Exception as e:
print(f"[MCPService] Error registering AI agent: {e}")
return ""
def move_agent(self, agent_id: str, direction: str) -> Dict:
"""Move AI agent in specified direction."""
try:
player_id = self.ai_agents.get(agent_id)
if not player_id:
return {"success": False, "error": "Agent not found"}
success = self.player_service.move_player(player_id, direction)
if success:
player = self.player_service.get_player(player_id)
return {
"success": True,
"position": {"x": player.x, "y": player.y} if player else None,
"message": f"Moved {direction}"
}
else:
return {"success": False, "error": "Movement failed"}
except Exception as e:
print(f"[MCPService] Error moving agent: {e}")
return {"success": False, "error": str(e)}
def send_chat(self, agent_id: str, message: str) -> Dict:
"""Send chat message from AI agent."""
try:
player_id = self.ai_agents.get(agent_id)
if not player_id:
return {"success": False, "error": "Agent not found"}
success = self.chat_service.send_public_message(player_id, message)
if success:
return {"success": True, "message": "Message sent"}
else:
return {"success": False, "error": "Failed to send message"}
except Exception as e:
print(f"[MCPService] Error sending chat: {e}")
return {"success": False, "error": str(e)}
def get_game_state(self) -> Dict:
"""Get current game state for AI agents."""
try:
all_players = self.player_service.get_all_players()
recent_chat = self.chat_service.get_recent_messages(10)
# Format player data for AI consumption
players_data = {}
for player_id, player in all_players.items():
players_data[player_id] = {
"name": player.name,
"type": player.type,
"position": {"x": player.x, "y": player.y},
"level": player.level,
"hp": player.hp,
"is_active": player.is_active()
}
return {
"success": True,
"data": {
"players": players_data,
"recent_chat": recent_chat,
"world_info": {
"width": 500, # Current world dimensions
"height": 400,
"total_players": len(all_players)
}
}
}
except Exception as e:
print(f"[MCPService] Error getting game state: {e}")
return {"success": False, "error": str(e)}
def interact_with_npc(self, agent_id: str, npc_id: str, message: str) -> Dict:
"""Allow AI agent to interact with NPCs."""
try:
player_id = self.ai_agents.get(agent_id)
if not player_id:
return {"success": False, "error": "Agent not found"}
# This would typically go through NPCService
# For now, return a placeholder response
return {
"success": True,
"npc_response": f"Hello there! You said: {message}"
}
except Exception as e:
print(f"[MCPService] Error in NPC interaction: {e}")
return {"success": False, "error": str(e)}
def get_nearby_entities(self, agent_id: str, radius: int = 50) -> Dict:
"""Get entities near the AI agent."""
try:
player_id = self.ai_agents.get(agent_id)
if not player_id:
return {"success": False, "error": "Agent not found"}
agent_player = self.player_service.get_player(player_id)
if not agent_player:
return {"success": False, "error": "Agent player not found"}
all_players = self.player_service.get_all_players()
nearby_players = []
for other_id, other_player in all_players.items():
if other_id != player_id:
distance = agent_player.get_distance_to(other_player.x, other_player.y)
if distance <= radius:
nearby_players.append({
"id": other_id,
"name": other_player.name,
"type": other_player.type,
"position": {"x": other_player.x, "y": other_player.y},
"distance": round(distance, 1)
})
return {
"success": True,
"nearby_players": nearby_players,
"agent_position": {"x": agent_player.x, "y": agent_player.y}
}
except Exception as e:
print(f"[MCPService] Error getting nearby entities: {e}")
return {"success": False, "error": str(e)}
def unregister_ai_agent(self, agent_id: str) -> Dict:
"""Unregister AI agent and remove from game."""
try:
player_id = self.ai_agents.get(agent_id)
if not player_id:
return {"success": False, "error": "Agent not found"}
# Get player name for farewell message
player = self.player_service.get_player(player_id)
player_name = player.name if player else "Unknown"
# Remove player from world
success = self.player_service.remove_player_from_world(player_id)
if success:
# Remove from agent mapping
del self.ai_agents[agent_id]
# Send farewell message
self.chat_service.add_system_message(
f"🤖 AI Agent '{player_name}' has left the game!"
)
return {"success": True, "message": "Agent unregistered"}
else:
return {"success": False, "error": "Failed to remove agent"}
except Exception as e:
print(f"[MCPService] Error unregistering agent: {e}")
return {"success": False, "error": str(e)}
def get_registered_agents(self) -> Dict[str, str]:
"""Get all registered AI agents."""
return self.ai_agents.copy()
def is_agent_registered(self, agent_id: str) -> bool:
"""Check if an AI agent is registered."""
return agent_id in self.ai_agents