MMORPG_AI_NPC_MCP_CLIENT_SERVER / app_original_backup.py
Chris4K's picture
Upload 195 files
4c75d73 verified
#!/usr/bin/env python3
"""
COMPLETE MMORPG with MCP Server Integration - ALL FEATURES RESTORED
Fixed deadlock issue while preserving all original functionality
"""
import gradio as gr
import asyncio
import json
import time
import uuid
import random
import random
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod
from mcp import ClientSession
from mcp.client.sse import sse_client
from contextlib import AsyncExitStack
# ============================================================================
# CORE GAME STATE MANAGEMENT
# ============================================================================
@dataclass
@dataclass
class Player:
id: str
name: str
type: str # "human" or "ai_agent"
x: int = 100
y: int = 100
level: int = 1
hp: int = 100
max_hp: int = 100
gold: int = 50
experience: int = 0
last_active: float = 0
session_hash: str = ""
ai_agent_id: str = "" # For AI agents connected via MCP
class GameWorld:
def __init__(self):
self.players: Dict[str, Player] = {}
self.npcs: Dict[str, Dict] = {
'read2burn_mailbox': {
'id': 'read2burn_mailbox',
'name': 'Secure Mailbox',
'x': 200, 'y': 150,
'char': '📮',
'type': 'addon'
},
'merchant': {
'id': 'merchant',
'name': 'Tom the Trader',
'x': 300, 'y': 200,
'char': '🏪',
'type': 'basic'
},
'weather_oracle': {
'id': 'weather_oracle',
'name': 'Weather Oracle',
'x': 150, 'y': 300,
'char': '🌤️',
'type': 'mcp'
},
'scholar': {
'id': 'scholar',
'name': 'Professor Wise',
'x': 100, 'y': 100,
'char': '📚',
'type': 'learning',
'personality': 'wise_teacher'
},
'jester': {
'id': 'jester',
'name': 'Funny Pete',
'x': 400, 'y': 100,
'char': '🃏',
'type': 'entertainment',
'personality': 'comedian'
},
'warrior': {
'id': 'warrior',
'name': 'Captain Steel',
'x': 350, 'y': 350,
'char': '⚔️',
'type': 'combat',
'personality': 'tough_trainer'
},
'healer': {
'id': 'healer',
'name': 'Sister Grace',
'x': 50, 'y': 250,
'char': '💚',
'type': 'healing',
'personality': 'gentle_healer'
},
'wanderer': {
'id': 'wanderer',
'name': 'Roaming Rick',
'x': 200, 'y': 200,
'char': '🚶',
'type': 'moving',
'personality': 'traveler',
'movement': {
'speed': 2,
'direction_x': 1,
'direction_y': 1,
'last_move': time.time()
}
},
'sage': {
'id': 'sage',
'name': 'Ancient Sage',
'x': 450, 'y': 250,
'char': '🧙',
'type': 'magic',
'personality': 'mystical_sage'
}
}
self.chat_messages: List[Dict] = []
self.world_events: List[Dict] = []
self.addon_npcs: Dict[str, 'NPCAddon'] = {}
self._lock = threading.RLock() # FIXED: Use RLock to prevent deadlock
# Add initial chat message
self.add_chat_message("System", "🎮 Game server started!")
def add_player(self, player: Player) -> bool:
print(f"[GameWorld.add_player] Adding {player.name}")
with self._lock:
if len(self.players) >= 20: # Max players
return False
self.players[player.id] = player
self.add_chat_message("System", f"🎮 {player.name} ({player.type}) joined the game!")
print(f"[GameWorld] Player {player.name} added. Total players: {len(self.players)}")
return True
def remove_player(self, player_id: str) -> bool:
with self._lock:
if player_id in self.players:
player = self.players[player_id]
self.add_chat_message("System", f"👋 {player.name} left the game!")
del self.players[player_id]
return True
return False
def move_player(self, player_id: str, direction: str) -> bool:
with self._lock:
if player_id not in self.players:
return False
player = self.players[player_id]
old_x, old_y = player.x, player.y
# Move player
if direction == "up":
player.y = max(0, player.y - 25)
elif direction == "down":
player.y = min(375, player.y + 25)
elif direction == "left":
player.x = max(0, player.x - 25)
elif direction == "right":
player.x = min(475, player.x + 25)
player.last_active = time.time()
# Add small XP for movement
if old_x != player.x or old_y != player.y:
player.experience += 1
if player.experience >= player.level * 100:
player.level += 1
player.max_hp += 10
player.hp = player.max_hp
player.gold += 10
self.add_chat_message("System", f"🎉 {player.name} reached level {player.level}!")
# Check for NPC interactions
self.check_npc_proximity(player_id)
return old_x != player.x or old_y != player.y
def add_chat_message(self, sender: str, message: str, message_type: str = "public", target: str = None, sender_id: str = None):
with self._lock:
chat_msg = {
'sender': sender,
'message': message,
'timestamp': time.strftime("%H:%M:%S"),
'id': len(self.chat_messages),
'type': message_type,
'target': target,
'sender_id': sender_id
}
self.chat_messages.append(chat_msg)
if len(self.chat_messages) > 50:
self.chat_messages = self.chat_messages[-50:]
def check_npc_proximity(self, player_id: str):
"""Check if player is near any NPCs"""
player = self.players.get(player_id)
if not player:
return []
nearby_entities = []
# Check NPCs
for npc_id, npc in self.npcs.items():
distance = ((player.x - npc['x'])**2 + (player.y - npc['y'])**2)**0.5
if distance < 50: # Close enough to interact
self.add_world_event(f"{player.name} is near {npc['name']}")
nearby_entities.append({
'type': 'npc',
'id': npc_id,
'name': npc['name'],
'distance': distance
})
# Check other players
for other_id, other_player in self.players.items():
if other_id != player_id:
distance = ((player.x - other_player.x)**2 + (player.y - other_player.y)**2)**0.5
if distance < 50: # Close enough to chat privately
nearby_entities.append({
'type': 'player',
'id': other_id,
'name': other_player.name,
'distance': distance
})
return nearby_entities
def send_private_message(self, sender_id: str, target_id: str, message: str) -> tuple[bool, str]:
"""Send private message between players or to NPC"""
sender = self.players.get(sender_id)
if not sender:
error_msg = f"Sender player {sender_id} not found"
print(f"[PRIVATE_ERROR] {error_msg}")
return False, error_msg
print(f"[PRIVATE] Sending message from {sender.name} ({sender_id}) to {target_id}: {message}")
# Check if target is an NPC
if target_id in self.npcs:
npc = self.npcs[target_id]
print(f"[PRIVATE] Found NPC: {npc['name']} (ID: {target_id})")
# Send player's message
self.add_chat_message(
f"🔒 {sender.name}",
f"[Private to {npc['name']}]: {message}",
"private_to_npc",
target_id,
sender_id
) # Get and send NPC response
npc_response = self.get_npc_response(target_id, message, sender_id)
self.add_chat_message(
f"🤖 {npc['name']}",
npc_response,
"private_from_npc",
sender_id,
target_id
)
print(f"[PRIVATE] NPC {npc['name']} responded: {npc_response}")
return True, f"Message sent to {npc['name']}"
# Check if target is another player
target = self.players.get(target_id)
if target:
print(f"[PRIVATE] Found player: {target.name} (ID: {target_id})")
self.add_chat_message(
f"🔒 {sender.name}",
f"[Private to {target.name}]: {message}",
"private_to_player",
target_id,
sender_id )
return True, f"Message sent to {target.name}"
# Neither NPC nor player found
available_npcs = list(self.npcs.keys())
available_players = list(self.players.keys())
error_msg = f"Target '{target_id}' not found. Available NPCs: {available_npcs}, Available players: {available_players}"
print(f"[PRIVATE_ERROR] {error_msg}")
return False, error_msg
def get_npc_response(self, npc_id: str, message: str, player_id: str = None) -> str:
"""Generate NPC response - checks for addons first, then falls back to generic responses"""
npc = self.npcs.get(npc_id)
if not npc:
return "I don't understand."
# Check if this NPC has an addon that can handle commands
if npc_id in self.addon_npcs and player_id:
addon = self.addon_npcs[npc_id]
print(f"[NPC_RESPONSE] Found addon for {npc_id}, delegating to handle_command")
return addon.handle_command(player_id, message)
# Fall back to personality-based or ID-based responses
personality = npc.get('personality', npc_id)
# Basic NPC responses based on NPC type or personality
responses = {
'read2burn_mailbox': [
"Would you like to send a secure message?",
"I can help you with encrypted messaging.",
"Your message will burn after reading!"
],
'merchant': [
"Welcome to my shop! What would you like to buy?",
"I have the finest items in the realm!",
"Special discount today - 10% off all potions!"
],
'weather_oracle': [
"The winds whisper of changes ahead...",
"I sense a storm approaching...",
"The weather spirits are restless today."
], 'wise_teacher': [
"Ah, a curious mind! What would you like to learn?",
"Knowledge is the greatest treasure. Ask me anything!",
"I have studied the ancient texts for decades.",
"Wisdom comes through questioning. What puzzles you?"
],
'comedian': [
"Haha! Want to hear a joke? Why don't skeletons fight? They don't have the guts!",
"What do you call a sleeping bull? A bulldozer! *laughs*",
"I've got jokes for days! Life's too short not to laugh!",
"Why did the scarecrow win an award? He was outstanding in his field!"
],
'tough_trainer': [
"Ready for combat training? Show me your stance!",
"A true warrior trains every day. Are you committed?",
"Strength comes from discipline and practice!",
"The blade is an extension of your will. Focus!"
],
'gentle_healer': [
"Blessings upon you, traveler. Do you need healing?",
"The light guides my hands. I can mend your wounds.",
"Health of body and spirit go hand in hand.",
"May the divine light restore your vitality!"
],
'traveler': [
"The road calls to me... always moving, always exploring!",
"I've seen wonders beyond imagination in my travels.",
"Adventure awaits around every corner!",
"Sometimes the journey is more important than the destination."
],
'mystical_sage': [
"Magic flows through all things, young one.",
"The arcane arts require patience and understanding.",
"Ancient powers stir... can you feel them?",
"Wisdom and magic are closely intertwined."
]
}
# Try personality first, then fall back to npc_id, then default
response_key = personality if personality in responses else npc_id
return random.choice(responses.get(response_key, [
"Hello there, traveler!",
"How can I help you?",
"Nice weather we're having."
]))
def get_private_messages_for_player(self, player_id: str) -> List[Dict]:
"""Get private messages for a specific player"""
player = self.players.get(player_id)
if not player:
return []
private_messages = []
for msg in self.chat_messages[-20:]: # Last 20 messages
msg_type = msg.get('type', 'public')
if msg_type != 'public':
# Check if this message is for this player
if (msg.get('target') == player_id or
msg.get('sender_id') == player_id or
(msg_type in ['private_from_npc', 'private_to_npc'] and
(msg.get('target') == player_id or msg.get('sender_id') == player_id))):
print(f"[PRIVATE_FILTER] Found private message for {player_id}: {msg['sender']}: {msg['message']} (type: {msg_type})")
private_messages.append(msg)
print(f"[PRIVATE_DEBUG] Player {player_id} has {len(private_messages)} private messages")
return private_messages
def add_world_event(self, event: str):
self.world_events.append({
'event': event,
'timestamp': time.time()
})
if len(self.world_events) > 20:
self.world_events = self.world_events[-20:]
def update_moving_npcs(self):
"""Update positions of moving NPCs like Roaming Rick"""
current_time = time.time()
with self._lock:
for npc_id, npc in self.npcs.items():
# Check if this NPC has movement configuration
if npc.get('type') == 'moving' and 'movement' in npc:
movement = npc['movement']
# Check if enough time has passed for movement (based on speed)
time_since_last_move = current_time - movement.get('last_move', 0)
movement_interval = 2.0 / movement.get('speed', 1) # Faster speed = shorter interval
if time_since_last_move >= movement_interval:
# Store old position
old_x, old_y = npc['x'], npc['y']
# Calculate new position
move_distance = 25 # Same as player movement
direction_x = movement.get('direction_x', 1)
direction_y = movement.get('direction_y', 1)
new_x = npc['x'] + (move_distance * direction_x)
new_y = npc['y'] + (move_distance * direction_y)
# Check boundaries and bounce if needed
if new_x <= 0 or new_x >= 475:
direction_x *= -1 # Reverse X direction
new_x = max(0, min(475, new_x))
if new_y <= 0 or new_y >= 375:
direction_y *= -1 # Reverse Y direction
new_y = max(0, min(375, new_y))
# Update NPC position and movement data
npc['x'] = new_x
npc['y'] = new_y
movement['direction_x'] = direction_x
movement['direction_y'] = direction_y
movement['last_move'] = current_time
# Add world event if NPC moved
if old_x != new_x or old_y != new_y:
self.add_world_event(f"🚶 {npc['name']} roams to ({int(new_x)}, {int(new_y)})")
# Check if any players are now near this moved NPC
for player_id, player in self.players.items():
distance = ((player.x - new_x)**2 + (player.y - new_y)**2)**0.5
if distance < 50: # Close enough to notice
self.add_world_event(f"👀 {player.name} notices {npc['name']} nearby")
# Global game world
game_world = GameWorld()
# ============================================================================
# NPC ADDON SYSTEM - RESTORED
# ============================================================================
class NPCAddon(ABC):
"""Base class for NPC add-ons"""
@property
@abstractmethod
def addon_id(self) -> str:
pass
@property
@abstractmethod
def addon_name(self) -> str:
pass
@abstractmethod
def get_interface(self) -> gr.Component:
"""Return Gradio interface for this add-on"""
pass
@abstractmethod
def handle_command(self, player_id: str, command: str) -> str:
"""Handle player commands"""
pass
class Read2BurnMailboxAddon(NPCAddon):
"""Self-destructing secure mailbox add-on - RESTORED"""
def __init__(self):
self.messages: Dict[str, Dict] = {}
self.access_log: List[Dict] = []
@property
def addon_id(self) -> str:
return "read2burn_mailbox"
@property
def addon_name(self) -> str:
return "🔥 Read2Burn Secure Mailbox"
def get_interface(self) -> gr.Component:
with gr.Column() as interface:
gr.Markdown("""
## 🔥 Read2Burn Secure Mailbox
**Features:**
- Messages self-destruct after reading
- End-to-end encryption simulation
- 24-hour expiration
- Anonymous delivery
**Commands:**
- `create Your secret message here` - Create new message
- `read MESSAGE_ID` - Read message (destroys it!)
- `list` - Show your created messages
""")
with gr.Row():
command_input = gr.Textbox(
label="Command",
placeholder="create Hello, this message will self-destruct!",
scale=3
)
send_btn = gr.Button("Send", variant="primary", scale=1)
result_output = gr.Textbox(
label="Mailbox Response",
lines=5,
interactive=False
)
# Message history
message_history = gr.Dataframe(
headers=["Message ID", "Created", "Status", "Reads Left"],
label="Your Messages",
interactive=False
)
# FIXED: Use a different approach for player identification
def handle_mailbox_command(command: str):
# Get the current player from the global state
current_players = list(game_world.players.keys())
if not current_players:
return "❌ No players in the game! Please join the game first.", []
# Use the most recently active player (simplified approach)
player_id = max(current_players, key=lambda pid: game_world.players[pid].last_active)
player_name = game_world.players[player_id].name
print(f"[Read2Burn] Command '{command}' from player {player_name} ({player_id})")
result = self.handle_command(player_id, command)
history = self.get_player_message_history(player_id)
# Add player info to the result
result = f"**Player:** {player_name}\n\n{result}"
return result, history
send_btn.click(
handle_mailbox_command,
inputs=[command_input],
outputs=[result_output, message_history]
)
command_input.submit(
handle_mailbox_command,
inputs=[command_input],
outputs=[result_output, message_history]
)
return interface
def handle_command(self, player_id: str, command: str) -> str:
"""Handle Read2Burn mailbox commands"""
parts = command.strip().split(' ', 1)
cmd = parts[0].lower()
if cmd == "create" and len(parts) > 1:
return self.create_message(player_id, parts[1])
elif cmd == "read" and len(parts) > 1:
return self.read_message(player_id, parts[1])
elif cmd == "list":
return self.list_player_messages(player_id)
else:
return "❓ Invalid command. Try: create <message>, read <id>, or list"
def create_message(self, creator_id: str, content: str) -> str:
"""Create a new self-destructing message"""
message_id = self.generate_message_id()
self.messages[message_id] = {
'id': message_id,
'creator': creator_id,
'content': content, # In production, encrypt this
'created_at': time.time(),
'expires_at': time.time() + (24 * 3600), # 24 hours
'reads_left': 1,
'burned': False
}
self.access_log.append({
'action': 'create',
'message_id': message_id,
'player_id': creator_id,
'timestamp': time.time()
})
return f"✅ **Message Created Successfully!**\n\n📝 **Message ID:** `{message_id}`\n🔗 Share this ID with the recipient\n⏰ Expires in 24 hours\n🔥 Burns after 1 read"
def read_message(self, reader_id: str, message_id: str) -> str:
"""Read and burn a message"""
if message_id not in self.messages:
return "❌ Message not found or already burned"
message = self.messages[message_id]
# Check expiry
if time.time() > message['expires_at']:
del self.messages[message_id]
return "❌ Message expired and has been burned"
# Check if already burned
if message['burned'] or message['reads_left'] <= 0:
del self.messages[message_id]
return "❌ Message has already been burned"
# Read the message
content = message['content']
message['reads_left'] -= 1
self.access_log.append({
'action': 'read',
'message_id': message_id,
'player_id': reader_id,
'timestamp': time.time()
})
# Burn if no reads left
if message['reads_left'] <= 0:
message['burned'] = True
del self.messages[message_id]
burn_notice = "\n\n🔥 **This message has been BURNED and deleted forever!**"
else:
burn_notice = f"\n\n⚠️ **{message['reads_left']} reads remaining before burn**"
return f"📖 **Message Content:**\n\n{content}{burn_notice}"
def list_player_messages(self, player_id: str) -> str:
"""List messages created by player"""
player_messages = [
msg for msg in self.messages.values()
if msg['creator'] == player_id
]
if not player_messages:
return "📭 You have no active messages"
result = "📨 **Your Active Messages:**\n\n"
for msg in player_messages:
expires_in = int((msg['expires_at'] - time.time()) / 3600)
result += f"🆔 `{msg['id']}` | ⏱️ {expires_in}h left | 👁️ {msg['reads_left']} reads\n"
return result
def get_player_message_history(self, player_id: str) -> List[List]:
"""Get message history for display"""
player_messages = [
msg for msg in self.messages.values()
if msg['creator'] == player_id
]
history = []
for msg in player_messages:
expires_in = int((msg['expires_at'] - time.time()) / 3600)
status = "Active" if not msg['burned'] else "Burned"
history.append([
msg['id'][:8] + "...",
datetime.fromtimestamp(msg['created_at']).strftime("%H:%M"),
status,
str(msg['reads_left'])
])
return history
def generate_message_id(self) -> str:
"""Generate a unique message ID"""
import string
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(12))
# ============================================================================
# MCP ADDON SYSTEM - RESTORED
# ============================================================================
# Setup event loop for MCP client
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class SimpleMCPClient:
"""MCP client for weather services"""
def __init__(self):
self.session = None
self.connected = False
self.tools = []
self.exit_stack = None
self.server_url = "https://chris4k-weather.hf.space/gradio_api/mcp/sse"
def connect(self) -> str:
"""Connect to the hardcoded MCP server"""
return loop.run_until_complete(self._connect())
async def _connect(self) -> str:
try:
# Clean up previous connection
if self.exit_stack:
await self.exit_stack.aclose()
self.exit_stack = AsyncExitStack()
# Connect to SSE MCP server
sse_transport = await self.exit_stack.enter_async_context(
sse_client(self.server_url)
)
read_stream, write_callable = sse_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(read_stream, write_callable)
)
await self.session.initialize()
# Get available tools
response = await self.session.list_tools()
self.tools = response.tools
self.connected = True
tool_names = [tool.name for tool in self.tools]
return f"✅ Connected to weather server!\nAvailable tools: {', '.join(tool_names)}"
except Exception as e:
self.connected = False
return f"❌ Connection failed: {str(e)}"
def get_weather(self, location: str) -> str:
"""Get weather for a location (city, country format)"""
if not self.connected:
# Try to auto-connect
connect_result = self.connect()
if not self.connected:
return f"❌ Failed to connect to weather server: {connect_result}"
if not location.strip():
return "❌ Please enter a location (e.g., 'Berlin, Germany')"
return loop.run_until_complete(self._get_weather(location))
async def _get_weather(self, location: str) -> str:
try:
# Parse location
if ',' in location:
city, country = [part.strip() for part in location.split(',', 1)]
else:
city = location.strip()
country = ""
# Find the weather tool
weather_tool = next((tool for tool in self.tools if 'weather' in tool.name.lower()), None)
if not weather_tool:
return "❌ Weather tool not found on server"
# Call the tool
params = {"city": city, "country": country}
result = await self.session.call_tool(weather_tool.name, params)
# Extract content properly
content_text = ""
if hasattr(result, 'content') and result.content:
if isinstance(result.content, list):
for content_item in result.content:
if hasattr(content_item, 'text'):
content_text += content_item.text
elif hasattr(content_item, 'content'):
content_text += str(content_item.content)
else:
content_text += str(content_item)
elif hasattr(result.content, 'text'):
content_text = result.content.text
else:
content_text = str(result.content)
if not content_text:
return "❌ No content received from server"
try:
# Try to parse as JSON
parsed = json.loads(content_text)
if isinstance(parsed, dict):
if 'error' in parsed:
return f"❌ Error: {parsed['error']}"
# Format weather data nicely
if 'current_weather' in parsed:
weather = parsed['current_weather']
formatted = f"🌍 **{parsed.get('location', 'Unknown')}**\n\n"
formatted += f"🌡️ Temperature: {weather.get('temperature_celsius', 'N/A')}°C\n"
formatted += f"🌤️ Conditions: {weather.get('weather_description', 'N/A')}\n"
formatted += f"💨 Wind: {weather.get('wind_speed_kmh', 'N/A')} km/h\n"
formatted += f"💧 Humidity: {weather.get('humidity_percent', 'N/A')}%\n"
return formatted
elif 'temperature (°C)' in parsed:
# Handle the original format from your server
formatted = f"🌍 **{parsed.get('location', 'Unknown')}**\n\n"
formatted += f"🌡️ Temperature: {parsed.get('temperature (°C)', 'N/A')}°C\n"
formatted += f"🌤️ Weather Code: {parsed.get('weather_code', 'N/A')}\n"
formatted += f"🕐 Timezone: {parsed.get('timezone', 'N/A')}\n"
formatted += f"🕒 Local Time: {parsed.get('local_time', 'N/A')}\n"
return formatted
else:
return f"✅ Weather data:\n```json\n{json.dumps(parsed, indent=2)}\n```"
except json.JSONDecodeError:
# If not JSON, return as text
return f"✅ Weather data:\n```\n{content_text}\n```"
return f"✅ Raw result:\n{content_text}"
except Exception as e:
return f"❌ Failed to get weather: {str(e)}"
class MCPAddonWrapper(NPCAddon):
"""Wrapper to use any MCP server as a game add-on"""
def __init__(self, mcp_endpoint: str, addon_name: str):
self.mcp_endpoint = mcp_endpoint
self._addon_name = addon_name
self._addon_id = f"mcp_{addon_name.lower().replace(' ', '_')}"
# Initialize MCP client
self.mcp_client = SimpleMCPClient()
# Auto-connect on initialization
try:
connection_result = self.mcp_client.connect()
print(f"[MCP] {addon_name} connection: {connection_result}")
except Exception as e:
print(f"[MCP] Failed to connect {addon_name}: {e}")
@property
def addon_id(self) -> str:
return self._addon_id
@property
def addon_name(self) -> str:
return self._addon_name
def get_interface(self) -> gr.Component:
with gr.Column() as interface:
gr.Markdown(f"## 🌤️ {self.addon_name}")
gr.Markdown("*Ask for weather in any city! Format: 'City, Country' (e.g., 'Berlin, Germany')*")
# Connection status
connection_status = gr.HTML(
value="<div style='color: green;'>🟢 Auto-connecting to weather server...</div>"
)
location_input = gr.Textbox(
label="Location",
placeholder="e.g., Berlin, Germany",
lines=1
)
weather_output = gr.Textbox(
label="Weather Information",
lines=8,
interactive=False
)
get_weather_btn = gr.Button("🌡️ Get Weather", variant="primary")
# Examples
with gr.Row():
gr.Examples(
examples=[
["Berlin, Germany"],
["Tokyo, Japan"],
["New York, USA"],
["London, UK"],
["Sydney, Australia"]
],
inputs=[location_input]
)
def handle_weather_request(location: str):
# Get the current player from the global state
current_players = list(game_world.players.keys())
if not current_players:
return "❌ No players in the game! Please join the game first!"
if not location.strip():
return "❌ Please enter a location (e.g., 'Berlin, Germany')"
# Use the most recently active player
player_id = max(current_players, key=lambda pid: game_world.players[pid].last_active)
# Get weather using MCP client
result = self.mcp_client.get_weather(location)
# Log the interaction
player_name = game_world.players[player_id].name
print(f"[WEATHER] {player_name} requested weather for {location}")
return result
get_weather_btn.click(
handle_weather_request,
inputs=[location_input],
outputs=[weather_output]
)
location_input.submit(
handle_weather_request,
inputs=[location_input],
outputs=[weather_output]
)
return interface
def handle_command(self, player_id: str, command: str) -> str:
"""Handle weather commands from private messages"""
# Remove common command prefixes
clean_command = command.strip()
if clean_command.startswith('/'):
clean_command = clean_command[1:]
# Check if it looks like a weather request
if not clean_command:
return "🌤️ Weather Oracle: Please tell me a location! Format: 'City, Country' (e.g., 'Berlin, Germany')"
# Get weather for the location
result = self.mcp_client.get_weather(clean_command)
# Log the interaction
if player_id in game_world.players:
player_name = game_world.players[player_id].name
print(f"[WEATHER_PM] {player_name} requested weather for {clean_command}")
return f"🌤️ **Weather Oracle**: {result}"
# ============================================================================
# MCP TOOLS FOR AI AGENTS - RESTORED
# ============================================================================
class GradioMCPTools:
"""MCP tools integrated directly into Gradio app"""
def __init__(self, game_world: GameWorld):
self.game_world = game_world
self.ai_agents: Dict[str, Player] = {}
def register_ai_agent(self, agent_name: str, mcp_client_id: str = None) -> str:
"""Register an AI agent as a player"""
if mcp_client_id is None:
mcp_client_id = f"ai_{uuid.uuid4().hex[:8]}"
agent_id = f"ai_{uuid.uuid4().hex[:8]}"
agent_player = Player(
id=agent_id,
name=agent_name,
type="ai_agent",
x=random.randint(50, 450),
y=random.randint(50, 350),
ai_agent_id=mcp_client_id,
last_active=time.time()
)
if self.game_world.add_player(agent_player):
self.ai_agents[mcp_client_id] = agent_player
return agent_id
else:
raise Exception("Game is full, cannot add AI agent")
def move_ai_agent(self, mcp_client_id: str, direction: str) -> Dict:
"""Move AI agent in the game world"""
if mcp_client_id not in self.ai_agents:
return {"error": "AI agent not registered"}
agent = self.ai_agents[mcp_client_id]
success = self.game_world.move_player(agent.id, direction)
return {
"success": success,
"new_position": {"x": agent.x, "y": agent.y},
"nearby_players": self.get_nearby_entities(agent.id),
"world_events": self.game_world.world_events[-5:]
}
def ai_agent_chat(self, mcp_client_id: str, message: str) -> Dict:
"""AI agent sends chat message"""
if mcp_client_id not in self.ai_agents:
return {"error": "AI agent not registered"}
agent = self.ai_agents[mcp_client_id]
self.game_world.add_chat_message(f"🤖 {agent.name}", message)
return {"success": True, "message": "Chat message sent"}
def get_game_state_for_ai(self, mcp_client_id: str) -> Dict:
"""Get current game state for AI agent"""
if mcp_client_id not in self.ai_agents:
return {"error": "AI agent not registered"}
agent = self.ai_agents[mcp_client_id]
return {
"agent_status": asdict(agent),
"nearby_entities": self.get_nearby_entities(agent.id),
"recent_chat": self.game_world.chat_messages[-10:],
"world_events": self.game_world.world_events[-5:],
"available_npcs": list(self.game_world.npcs.keys())
}
def get_nearby_entities(self, player_id: str, radius: int = 100) -> List[Dict]:
"""Get entities near a player"""
player = self.game_world.players.get(player_id)
if not player:
return []
nearby = []
# Check other players
for other_player in self.game_world.players.values():
if other_player.id == player_id:
continue
distance = ((player.x - other_player.x)**2 + (player.y - other_player.y)**2)**0.5
if distance <= radius:
nearby.append({
"type": "player",
"name": other_player.name,
"player_type": other_player.type,
"position": {"x": other_player.x, "y": other_player.y},
"distance": round(distance, 1)
})
# Check NPCs
for npc in self.game_world.npcs.values():
distance = ((player.x - npc['x'])**2 + (player.y - npc['y'])**2)**0.5
if distance <= radius:
nearby.append({
"type": "npc",
"name": npc['name'],
"npc_id": npc['id'],
"position": {"x": npc['x'], "y": npc['y']},
"distance": round(distance, 1)
})
return nearby
# Global MCP tools
mcp_tools = GradioMCPTools(game_world)
# ============================================================================
# HELPER FUNCTIONS - RESTORED
# ============================================================================
def get_player_id_from_session(session_hash: str) -> Optional[str]:
"""Get player ID from session hash"""
for player in game_world.players.values():
if player.session_hash == session_hash:
return player.id
return None
def create_game_world_html() -> str:
"""Render the game world as HTML"""
html = f"""
<div style="width: 500px; height: 400px; background: linear-gradient(45deg, #2d5a27, #3d6b37);
position: relative; border: 2px solid #8B4513; border-radius: 10px; margin: 10px auto;
box-shadow: 0 4px 8px rgba(0,0,0,0.3);">
"""
# Draw grid pattern
html += """
<div style="position: absolute; width: 100%; height: 100%;
background-image:
linear-gradient(rgba(255,255,255,0.1) 1px, transparent 1px),
linear-gradient(90deg, rgba(255,255,255,0.1) 1px, transparent 1px);
background-size: 25px 25px;">
</div>
"""
# Draw NPCs
for npc in game_world.npcs.values():
html += f"""
<div style="position: absolute; left: {npc['x']}px; top: {npc['y']}px;
font-size: 24px; text-shadow: 2px 2px 4px rgba(0,0,0,0.8);
transition: all 0.3s ease;">
{npc['char']}
</div>
<div style="position: absolute; left: {npc['x']-10}px; top: {npc['y']-20}px;
background: rgba(0,0,0,0.7); color: white; padding: 2px 6px;
border-radius: 3px; font-size: 10px; font-weight: bold;">{npc['name']}</div>
"""
# Draw players
for player in game_world.players.values():
char = "🤖" if player.type == "ai_agent" else "🧝‍♂️"
color = "gold" if player.type == "ai_agent" else "lightblue"
html += f"""
<div style="position: absolute; left: {player.x}px; top: {player.y}px;
font-size: 20px; text-shadow: 2px 2px 4px rgba(0,0,0,0.8);
transition: all 0.3s ease; z-index: 10;">
{char}
</div>
<div style="position: absolute; left: {player.x-15}px; top: {player.y-20}px;
background: rgba(0,0,0,0.8); color: {color}; padding: 2px 6px;
border-radius: 3px; font-size: 9px; font-weight: bold; z-index: 11;">
{player.name} (Lv.{player.level})
</div>
"""
html += "</div>"
return html
def get_player_stats_display(player_id: str) -> Dict:
"""Get formatted player stats for display"""
if not player_id or player_id not in game_world.players:
return {"status": "❌ Not connected", "info": "Join the game to see your stats"}
player = game_world.players[player_id]
return {
"status": "🟢 Connected",
"name": player.name,
"type": player.type,
"level": player.level,
"hp": f"{player.hp}/{player.max_hp}",
"gold": player.gold,
"experience": f"{player.experience}/{player.level * 100}",
"position": f"({player.x}, {player.y})",
"last_update": time.strftime("%H:%M:%S"),
"session_id": player.id[:8] + "..."
}
# ============================================================================
# MAIN APPLICATION WITH ALL FEATURES - RESTORED
# ============================================================================
def create_mmorpg_interface():
"""Create the complete MMORPG interface with all features"""
# Initialize add-ons
read2burn_addon = Read2BurnMailboxAddon()
game_world.addon_npcs['read2burn_mailbox'] = read2burn_addon
# Example MCP add-on (simulated)
weather_mcp_addon = MCPAddonWrapper("http://localhost:8001/mcp", "Weather Oracle")
game_world.addon_npcs['weather_oracle'] = weather_mcp_addon
with gr.Blocks(
title="🎮 MMORPG with Complete MCP Integration"
) as demo:
# Keyboard status indicator
keyboard_status = gr.HTML(
value="<div id='keyboard-status' style='background:#e8f5e8;border:1px solid #4caf50;padding:8px;border-radius:4px;margin:8px 0;'>🎮 Keyboard loading...</div>"
)
# Inject keyboard control script
gr.HTML(r"""
<script>
const gameKeyboard = { enabled: false, buttons: {} };
function initKeyboard() {
const btns = document.querySelectorAll('button');
btns.forEach(btn => {
const t = btn.textContent.trim();
if (t==='↑') gameKeyboard.buttons.up=btn;
if (t==='↓') gameKeyboard.buttons.down=btn;
if (t==='←') gameKeyboard.buttons.left=btn;
if (t==='→') gameKeyboard.buttons.right=btn;
if (t.includes('⚔️')) gameKeyboard.buttons.action=btn;
});
const status = document.getElementById('keyboard-status');
if (Object.keys(gameKeyboard.buttons).length>=4) {
status.innerHTML='<span style="color:#2e7d32;">🎮 Keyboard ready! Use WASD or arrows</span>';
gameKeyboard.enabled=true;
} else {
status.innerHTML='<span style="color:#d32f2f;">❌ Keyboard init...found '+Object.keys(gameKeyboard.buttons).length+'/4</span>';
}
}
document.addEventListener('DOMContentLoaded',()=>{ initKeyboard(); });
document.addEventListener('keydown',e=>{
if (!gameKeyboard.enabled) return;
const tag=e.target.tagName.toLowerCase(); if (tag==='input'||tag==='textarea') return;
let btn;
switch(e.code){
case'ArrowUp':case'KeyW':btn=gameKeyboard.buttons.up;break;
case'ArrowDown':case'KeyS':btn=gameKeyboard.buttons.down;break;
case'ArrowLeft':case'KeyA':btn=gameKeyboard.buttons.left;break;
case'ArrowRight':case'KeyD':btn=gameKeyboard.buttons.right;break;
case'Space':btn=gameKeyboard.buttons.action;break;
}
if (btn){ e.preventDefault(); btn.click(); initKeyboard(); }
});
</script>
""")
gr.Markdown("""
<div style="text-align: center; padding: 20px; background: linear-gradient(90deg, #667eea 0%, #764ba2 100%);
color: white; border-radius: 10px; margin-bottom: 20px;">
<h1>🎮 MMORPG with Complete MCP Integration</h1>
<p><strong>All features restored! Keyboard controls, NPCs, MCP, Read2Burn & more!</strong></p>
<p>🌟 Real-time multiplayer • 🤖 AI agent support • 🔥 Read2Burn messaging • 🔌 MCP add-ons • ⌨️ Keyboard controls</p>
</div>
""")
# Player session state
player_state = gr.State({})
with gr.Tabs():
# Main Game Tab
with gr.Tab("🌍 Game World"):
with gr.Row():
with gr.Column(scale=2):
# Player registration
with gr.Group():
gr.Markdown("### 🎮 Join the Adventure")
with gr.Row():
player_name = gr.Textbox(
label="Player Name",
placeholder="Enter your character name",
scale=3
)
join_btn = gr.Button("Join Game", variant="primary", scale=1)
leave_btn = gr.Button("Leave Game", variant="secondary", scale=1)
# Keyboard controls info - ENHANCED
gr.Markdown("""
### ⌨️ Controls
**Mouse:** Click movement buttons below
**Keyboard:** Use **WASD** or **Arrow Keys** for movement
💡 *Press F12 → Console to see keyboard debug info*
""")
# Game world view
game_view = gr.HTML(
value=create_game_world_html()
)
# Movement controls
gr.Markdown("### 🕹️ Movement Controls")
with gr.Row():
gr.HTML("")
move_up = gr.Button("↑", size="lg", scale=1)
gr.HTML("")
with gr.Row():
move_left = gr.Button("←", size="lg", scale=1)
action_btn = gr.Button("⚔️", size="lg", scale=1, variant="secondary")
move_right = gr.Button("→", size="lg", scale=1)
with gr.Row():
gr.HTML("")
move_down = gr.Button("↓", size="lg", scale=1)
gr.HTML("")
with gr.Column(scale=1):
# Player info
player_info = gr.JSON(
label="🧝‍♂️ Player Stats",
value={"status": "Not connected", "info": "Join the game to see your stats"}
)
# Online players
online_players = gr.Dataframe(
headers=["Name", "Type", "Level"],
label="👥 Online Players",
interactive=False
)
# World events
world_events = gr.Textbox(
label="🌍 World Events & NPC Interactions",
lines=4,
interactive=False,
placeholder="World events will appear here...\n\n💡 Tip: Walk near NPCs (📮🏪🌤️) to interact with them!\nThen visit the 'NPC Add-ons' tab to use their features."
)
# Chat system
with gr.Row():
with gr.Column(scale=4):
chat_display = gr.Chatbot(
label="💬 Game Chat", height=200,
type='messages',
value=[{"role": "assistant", "content": "Welcome! Join the game to start chatting!"}]
)
with gr.Row():
chat_input = gr.Textbox(
placeholder="Type your message...",
scale=4,
container=False
)
chat_send = gr.Button("Send", scale=1, variant="primary")
# Private Chat Section - Multi-Tab Implementation
with gr.Column(scale=2):
# Auto-refresh toggle
with gr.Row():
auto_refresh_enabled = gr.Checkbox(
label="Auto-refresh (2s)",
value=True,
info="Toggle to preserve manual selections"
)
proximity_info = gr.HTML(
value="<div style='text-align: center; color: #666;'>🔍 Move near NPCs or players to chat privately</div>",
label="📱 Nearby Entities"
)
# Private chat interface with multi-tab support
with gr.Group(visible=False) as private_chat_group:
nearby_entities = gr.Dropdown(
label="💬 Start new chat with",
choices=[],
interactive=True
)
with gr.Row():
start_chat_btn = gr.Button("Start Chat", variant="primary", scale=1)
clear_all_tabs_btn = gr.Button("Clear All", variant="secondary", scale=1)
# Chat tabs container
with gr.Column() as chat_tabs_container:
# This will be dynamically populated with chat tabs
chat_tabs_state = gr.State({}) # Store active chat tabs: {entity_id: {tab_info, history, pinned}}
active_tabs_display = gr.HTML(
value="<div style='text-align: center; color: #666; padding: 10px;'>No active chats</div>",
label="Active Chats"
)
# Current active chat display
current_chat_display = gr.Chatbot(
label="🔒 Private Messages",
height=150,
type='messages',
value=[],
visible=False
)
with gr.Row(visible=False) as chat_input_row:
private_message_input = gr.Textbox(
placeholder="Type private message...",
scale=4,
container=False
)
private_send_btn = gr.Button("Send", scale=1, variant="secondary")
# NPC Add-ons Tab - RESTORED
with gr.Tab("🤖 NPC Add-ons"):
gr.Markdown("## Available NPC Add-ons")
gr.Markdown("*Extensible plugin system - each NPC can have unique functionality!*")
with gr.Tabs() as addon_tabs:
# Read2Burn Mailbox Add-on
with gr.Tab("🔥 Read2Burn Mailbox"):
read2burn_addon.get_interface()
# Weather Oracle MCP Add-on
with gr.Tab("🌤️ Weather Oracle (MCP)"):
weather_mcp_addon.get_interface()
# Add-on Manager
with gr.Tab("⚙️ Manage Add-ons"):
gr.Markdown("### Install New Add-ons")
with gr.Row():
addon_type = gr.Dropdown(
choices=["Python Plugin", "MCP Server", "Web Service"],
label="Add-on Type",
value="MCP Server"
)
with gr.Row():
addon_url = gr.Textbox(
label="Add-on URL/Path",
placeholder="http://localhost:8001/mcp or path/to/plugin.py"
)
with gr.Row():
addon_name = gr.Textbox(
label="Display Name",
placeholder="My Custom Add-on"
)
install_btn = gr.Button("Install Add-on", variant="primary")
install_status = gr.Textbox(label="Status", interactive=False)
def install_addon(addon_type: str, addon_url: str, addon_name: str):
if not addon_url or not addon_name:
return "❌ Please provide both URL and name"
if addon_type == "MCP Server":
try:
new_addon = MCPAddonWrapper(addon_url, addon_name)
game_world.addon_npcs[new_addon.addon_id] = new_addon
return f"✅ Successfully installed MCP add-on '{addon_name}'"
except Exception as e:
return f"❌ Failed to install add-on: {str(e)}"
else:
return f"🚧 {addon_type} installation not yet implemented"
install_btn.click(
install_addon,
inputs=[addon_type, addon_url, addon_name],
outputs=[install_status]
)
# MCP Integration Tab - RESTORED
with gr.Tab("🔌 MCP Integration"):
gr.Markdown("""
## MCP Integration for AI Agents
AI agents can connect to this game via MCP and participate as players!
### Available MCP Tools:
- `register_ai_agent(name)` - Join the game as an AI player
- `move_agent(direction)` - Move around the world
- `send_chat(message)` - Chat with other players
- `get_game_state()` - Get current world information
- `interact_with_npc(npc_id, message)` - Use NPC add-ons
### API Endpoints (when launched with api_open=True):
- `/api/register_ai_agent` - Register new AI agent
- `/api/move_ai_agent` - Move AI agent
- `/api/ai_agent_chat` - Send chat message
- `/api/get_game_state_for_ai` - Get game state
""")
# MCP server info
mcp_info = gr.JSON(
label="MCP Server Information",
value={
"server_status": "🟢 Active",
"server_url": "This Gradio app serves as MCP server",
"tools_available": [
"register_ai_agent",
"move_agent",
"send_chat",
"get_game_state",
"interact_with_npc"
],
"active_ai_agents": 0,
"total_players": 0
}
)
# AI Agent simulator (for testing)
with gr.Group():
gr.Markdown("### 🧪 Test AI Agent")
gr.Markdown("*Use this to simulate AI agent connections for testing*")
with gr.Row():
ai_name = gr.Textbox(
label="AI Agent Name",
placeholder="Claude the Explorer",
scale=3
)
register_ai_btn = gr.Button("Register AI Agent", variant="primary", scale=1)
with gr.Row():
ai_action = gr.Dropdown(
choices=["move up", "move down", "move left", "move right", "chat"],
label="AI Action",
scale=2
)
ai_message = gr.Textbox(
label="AI Message (for chat)",
placeholder="Hello humans!",
scale=3
)
execute_ai_btn = gr.Button("Execute AI Action", variant="secondary")
ai_result = gr.Textbox(label="AI Action Result", interactive=False, lines=5)
# ====================================================================
# KEYBOARD CONTROLS - RESTORED AND ENHANCED
# ====================================================================
# JavaScript code to inject into head (FIXED SYNTAX)
keyboard_js = """
<script>
let gameKeyboard = {
enabled: false,
buttons: {},
init: function() {
// Find movement buttons after DOM loads
setTimeout(function() {
var buttons = document.querySelectorAll('button');
buttons.forEach(function(btn) {
var text = btn.textContent.trim();
if (text === '↑') gameKeyboard.buttons.up = btn;
else if (text === '↓') gameKeyboard.buttons.down = btn;
else if (text === '←') gameKeyboard.buttons.left = btn;
else if (text === '→') gameKeyboard.buttons.right = btn;
else if (text.indexOf('⚔️') !== -1) gameKeyboard.buttons.action = btn;
});
// Update status indicator
const foundButtons = Object.keys(gameKeyboard.buttons).length;
const statusDiv = document.querySelector('div[style*="background: #e8f5e8"]');
if (statusDiv) {
if (foundButtons >= 4) {
statusDiv.innerHTML = '<span style="color: #2e7d32;">🎮 Keyboard controls ready! Use WASD or Arrow Keys</span>';
gameKeyboard.enabled = true;
console.log('🎮 Game keyboard initialized - found', foundButtons, 'buttons');
} else {
statusDiv.innerHTML = '<span style="color: #d32f2f;">❌ Keyboard loading... found ' + foundButtons + '/4 buttons</span>';
console.log('⏳ Still looking for movement buttons, found:', foundButtons);
}
}
}, 1000);
},
handleKey: function(event) {
if (!gameKeyboard.enabled) return;
// Only capture keys when not typing in inputs
var tagName = event.target.tagName.toLowerCase();
if (tagName === 'input' || tagName === 'textarea') {
return;
}
var button = null;
switch(event.code) {
case 'ArrowUp':
case 'KeyW':
button = gameKeyboard.buttons.up;
break;
case 'ArrowDown':
case 'KeyS':
button = gameKeyboard.buttons.down;
break;
case 'ArrowLeft':
case 'KeyA':
button = gameKeyboard.buttons.left;
break;
case 'ArrowRight':
case 'KeyD':
button = gameKeyboard.buttons.right;
break;
case 'Space':
button = gameKeyboard.buttons.action;
break;
}
if (button) {
event.preventDefault();
button.click();
// Visual feedback
button.style.backgroundColor = '#ff6b6b';
button.style.transform = 'scale(0.95)';
setTimeout(function() {
button.style.backgroundColor = '';
button.style.transform = '';
}, 150);
// Update status
const statusDiv = document.querySelector('div[style*="background: #e8f5e8"]');
if (statusDiv) {
statusDiv.innerHTML = '<span style="color: #2e7d32;">🎮 Last key: ' + event.code + ' ✓</span>';
}
console.log('🎯 Key pressed: ' + event.code);
}
}
};
// Initialize when page loads
document.addEventListener('DOMContentLoaded', function() {
gameKeyboard.init();
});
// Add keyboard listener
document.addEventListener('keydown', function(e) {
gameKeyboard.handleKey(e);
});
// Reinitialize when Gradio updates (for dynamic content)
setInterval(function() {
if (!gameKeyboard.enabled) {
gameKeyboard.init();
}
}, 2000);
</script>
"""
gr.HTML(keyboard_js)
# ====================================================================
# EVENT HANDLERS - RESTORED
# ====================================================================
def join_game(name: str, current_state: Dict, request: gr.Request):
"""Handle player joining the game"""
print(f"[JOIN] Called with name='{name}', state={current_state}")
if not name.strip():
return (
current_state,
{"status": "❌ Error", "info": "Please enter a valid name"},
create_game_world_html(),
[],
"Enter a player name to join!"
)
# Check if already joined
if current_state.get("player_id"):
player_id = current_state["player_id"]
if player_id in game_world.players:
player_stats = get_player_stats_display(player_id)
return (
current_state,
player_stats,
create_game_world_html(),
[[p.name, p.type, p.level] for p in game_world.players.values()],
"Already connected!"
)
# Create new player
player_id = str(uuid.uuid4())
player = Player(
id=player_id,
name=name.strip(),
type="human",
session_hash=request.session_hash,
last_active=time.time()
)
if game_world.add_player(player):
new_state = {"player_id": player_id, "player_name": name}
player_display = get_player_stats_display(player_id)
# Update displays
world_html = create_game_world_html()
players_list = [
[p.name, p.type, p.level]
for p in game_world.players.values()
]
events = "\n".join([
f"{e.get('event', '')}"
for e in game_world.world_events[-5:]
])
# Get proximity info for private chat
proximity_html, private_chat_visible, entity_choices, private_messages = get_proximity_status(new_state)
return (new_state, player_display, world_html, players_list, events,
proximity_html, private_chat_visible, entity_choices, private_messages)
else:
# Game is full - return default proximity state
return (
current_state,
{"status": "❌ Error", "info": "Game is full (20/20 players)"},
create_game_world_html(),
[],
"Game is full!",
"<div style='text-align: center; color: #666;'>🔍 Join the game to see nearby entities</div>",
gr.update(visible=False),
gr.update(choices=[]),
[] )
def leave_game(current_state: Dict):
"""Handle player leaving the game"""
if current_state and current_state.get("player_id"):
player_id = current_state["player_id"]
game_world.remove_player(player_id)
return (
{}, # Clear state
{"status": "Not connected", "info": "Join the game to see your stats"},
create_game_world_html(),
[[p.name, p.type, p.level] for p in game_world.players.values()],
"\n".join([e.get('event', '') for e in game_world.world_events[-5:]]),
"<div style='text-align: center; color: #666;'>🔍 Join the game to see nearby entities</div>",
gr.update(visible=False),
gr.update(choices=[]),
[]
)
else:
return (
current_state,
{"status": "Not connected", "info": "You're not in the game"},
create_game_world_html(),
[],
"Not connected",
"<div style='text-align: center; color: #666;'>🔍 Join the game to see nearby entities</div>",
gr.update(visible=False),
gr.update(choices=[]),
[] )
def handle_movement(direction: str, current_state: Dict):
"""Handle player movement"""
print(f"[MOVE] Direction: {direction}, state: {current_state}")
player_id = current_state.get("player_id")
if not player_id:
return current_state, create_game_world_html(), [], get_player_stats_display(None)
success = game_world.move_player(player_id, direction)
if success:
# Update displays
world_html = create_game_world_html()
players_list = [
[p.name, p.type, p.level]
for p in game_world.players.values()
]
player_stats = get_player_stats_display(player_id)
# Get proximity info for private chat
proximity_html, private_chat_visible, entity_choices, private_messages = get_proximity_status(current_state)
return (current_state, world_html, players_list, player_stats,
proximity_html, private_chat_visible, entity_choices, private_messages)
# No movement occurred
proximity_html, private_chat_visible, entity_choices, private_messages = get_proximity_status(current_state)
return (current_state, create_game_world_html(), [], get_player_stats_display(player_id),
proximity_html, private_chat_visible, entity_choices, private_messages)
def handle_chat_command(message: str, player: Player, player_id: str) -> str:
"""Handle chat commands like /heal, /stats, etc."""
parts = message.split()
command = parts[0].lower()
if command == "/heal":
# Healing command
if player.health < 100:
old_health = player.health
player.health = min(100, player.health + 25)
game_world.add_world_event(f"✨ {player.name} used healing magic!")
return f"💚 {player.name} healed for {player.health - old_health} HP! Health: {player.health}/100"
else:
return f"💚 {player.name} is already at full health!"
elif command == "/stats":
# Show player stats
return f"📊 {player.name} - Health: {player.health}/100, Position: ({player.x}, {player.y}), Type: {player.type}"
elif command == "/time":
# Show current game time
return f"🕐 Current time: {time.strftime('%H:%M:%S')}"
elif command == "/players":
# List online players
players_list = [p.name for p in game_world.players.values()]
return f"👥 Online players ({len(players_list)}): {', '.join(players_list)}"
elif command == "/npcs":
# List nearby NPCs
nearby_npcs = []
for npc_id, npc in game_world.npcs.items():
distance = ((player.x - npc['x'])**2 + (player.y - npc['y'])**2)**0.5
if distance <= 100:
nearby_npcs.append(f"{npc['name']} ({npc['type']})")
if nearby_npcs:
return f"🤖 Nearby NPCs: {', '.join(nearby_npcs)}"
else:
return "🤖 No NPCs nearby. Move around to find them!"
elif command == "/help":
# Show available commands
return """📖 Available Commands:
/heal - Restore 25 HP
/stats - Show your player stats
/time - Show current time
/players - List online players
/npcs - List nearby NPCs
/help - Show this help"""
else:
return f"❓ Unknown command: {command}. Use /help to see available commands."
def handle_chat(message: str, history: List, current_state: Dict):
"""Handle chat messages and commands"""
if not message.strip():
return history, ""
player_id = current_state.get("player_id")
if not player_id:
return history, ""
player = game_world.players.get(player_id)
if not player:
return history, ""
# Check if message is a command
if message.startswith('/'):
command_result = handle_chat_command(message, player, player_id)
if command_result:
game_world.add_chat_message("🎮 System", command_result)
else:
# Regular chat message
game_world.add_chat_message(player.name, message.strip())
# Update chat display with new message format
formatted_history = []
for msg in game_world.chat_messages[-20:]: formatted_history.append({
"role": "assistant",
"content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}"
})
return formatted_history, ""
def get_updated_private_messages(player_id: str):
"""Get formatted private messages for display"""
if not player_id:
return []
private_messages = game_world.get_private_messages_for_player(player_id)
formatted_private = []
for msg in private_messages:
formatted_private.append({
"role": "assistant",
"content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}"
})
print(f"[PRIVATE_UPDATE] Returning {len(formatted_private)} private messages for player {player_id}")
return formatted_private
def handle_private_message(message: str, current_state: Dict, chat_tabs_state: Dict):
"""Handle private messages using the active chat tab"""
if not message.strip():
return [], ""
if not current_state.get("player_id"):
return [], ""
if not chat_tabs_state:
return [], ""
print(f"[PRIVATE_MESSAGE_DEBUG] Chat tabs state: {chat_tabs_state}")
# Find the active chat tab
active_entity_id = None
for entity_id, tab_info in chat_tabs_state.items():
print(f"[PRIVATE_MESSAGE_DEBUG] Checking tab: entity_id={entity_id}, tab_info={tab_info}")
if tab_info.get('active', False):
active_entity_id = entity_id
break
print(f"[PRIVATE_MESSAGE_DEBUG] Active entity_id: {active_entity_id}")
if not active_entity_id:
return [], ""
player_id = current_state["player_id"]
print(f"[PRIVATE_MESSAGE_DEBUG] Sending message from player {player_id} to entity {active_entity_id}: '{message.strip()}'")
# Send the private message
success, error_message = game_world.send_private_message(player_id, active_entity_id, message.strip())
if success:
# Get updated messages for this entity
updated_messages = get_chat_messages_for_entity(player_id, active_entity_id)
return updated_messages, ""
else:
return [], ""
def get_chat_messages_for_entity(player_id: str, entity_id: str) -> List:
"""Get chat messages for a specific entity"""
messages = []
for msg in game_world.chat_messages:
msg_type = msg.get('type', '')
# Check for private messages between player and entity
if msg_type in ['private_to_npc', 'private_from_npc', 'private_to_player', 'private_from_player']:
# Message is to/from this specific entity and player
if ((msg.get('sender_id') == player_id and msg.get('target') == entity_id) or
(msg.get('sender_id') == entity_id and msg.get('target') == player_id)):
messages.append({
"role": "assistant" if msg.get('sender_id') != player_id else "user",
"content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}"
})
print(f"[CHAT_MESSAGES] Found {len(messages)} messages between player {player_id} and entity {entity_id}")
return messages
def start_new_chat(entity_id: str, entity_name: str, chat_tabs_state: Dict) -> tuple:
"""Start a new chat tab with an entity"""
if not entity_id or not entity_name:
return chat_tabs_state, "No entity selected"
# Create new tab if it doesn't exist
if entity_id not in chat_tabs_state:
chat_tabs_state[entity_id] = {
'name': entity_name,
'active': True,
'pinned': False,
'unread': 0
}
# Deactivate other tabs
for other_id in chat_tabs_state:
if other_id != entity_id:
chat_tabs_state[other_id]['active'] = False
else:
# Switch to existing tab
for other_id in chat_tabs_state:
chat_tabs_state[other_id]['active'] = (other_id == entity_id)
# Generate updated tabs HTML
tabs_html = generate_chat_tabs_html(chat_tabs_state)
return chat_tabs_state, tabs_html
def close_chat_tab(entity_id: str, chat_tabs_state: Dict) -> tuple:
"""Close a chat tab (respects pinned status)"""
if entity_id in chat_tabs_state:
if not chat_tabs_state[entity_id].get('pinned', False):
del chat_tabs_state[entity_id]
# If there are remaining tabs, activate the first one
if chat_tabs_state:
first_tab = next(iter(chat_tabs_state))
chat_tabs_state[first_tab]['active'] = True
tabs_html = generate_chat_tabs_html(chat_tabs_state)
return chat_tabs_state, tabs_html
def toggle_pin_tab(entity_id: str, chat_tabs_state: Dict) -> tuple:
"""Toggle pin status of a chat tab"""
if entity_id in chat_tabs_state:
chat_tabs_state[entity_id]['pinned'] = not chat_tabs_state[entity_id].get('pinned', False)
tabs_html = generate_chat_tabs_html(chat_tabs_state)
return chat_tabs_state, tabs_html
def clear_all_chat_tabs(chat_tabs_state: Dict) -> tuple:
"""Clear all non-pinned chat tabs"""
# Keep only pinned tabs
pinned_tabs = {k: v for k, v in chat_tabs_state.items() if v.get('pinned', False)}
chat_tabs_state.clear()
chat_tabs_state.update(pinned_tabs)
# If any tabs remain, activate the first one
if chat_tabs_state:
first_tab = next(iter(chat_tabs_state))
chat_tabs_state[first_tab]['active'] = True
tabs_html = generate_chat_tabs_html(chat_tabs_state)
return chat_tabs_state, tabs_html
# New: handle starting a chat and show chat area
def handle_start_chat(selection: str, chat_tabs_state: Dict, current_state: Dict):
"""Start a chat tab and show messages and input row"""
print(f"[HANDLE_START_CHAT] Called with selection='{selection}', type={type(selection)}")
if not selection:
print(f"[HANDLE_START_CHAT] No selection provided")
return chat_tabs_state, generate_chat_tabs_html(chat_tabs_state), gr.update(value=[], visible=False), gr.update(visible=False)
# The selection should be the entity_id from dropdown value (first part of tuple)
entity_id = selection
print(f"[HANDLE_START_CHAT] Using entity_id='{entity_id}'")
# Extra validation: ensure we have a valid entity ID
if not entity_id or not isinstance(entity_id, str):
print(f"[HANDLE_START_CHAT] Invalid entity_id: {entity_id}")
return chat_tabs_state, generate_chat_tabs_html(chat_tabs_state), gr.update(value=[], visible=False), gr.update(visible=False)
# Find the entity name by looking up the entity in the game world
entity_name = None
entity_type = None
if entity_id in game_world.npcs:
entity_name = game_world.npcs[entity_id]['name']
entity_type = "NPC"
print(f"[HANDLE_START_CHAT] Found NPC: {entity_id} -> {entity_name}")
elif entity_id in game_world.players:
entity_name = game_world.players[entity_id].name
entity_type = "Player"
print(f"[HANDLE_START_CHAT] Found Player: {entity_id} -> {entity_name}")
else:
print(f"[HANDLE_START_CHAT] ERROR: Entity '{entity_id}' not found in NPCs or players")
print(f"[HANDLE_START_CHAT] Available NPCs: {list(game_world.npcs.keys())}")
print(f"[HANDLE_START_CHAT] Available Players: {list(game_world.players.keys())}")
# Use entity_id as fallback name, but this shouldn't happen with proper dropdown
entity_name = entity_id
entity_type = "Unknown"
print(f"[CHAT_START] Starting chat with entity_id: '{entity_id}', entity_name: '{entity_name}', type: {entity_type}")
# Validate that we're not accidentally using a display name as entity_id
if entity_name and "(" in entity_id and ")" in entity_id:
print(f"[HANDLE_START_CHAT] WARNING: entity_id '{entity_id}' looks like a display name! This suggests a bug.")
# create or switch tab - use entity_id as the key
chat_tabs_state, tabs_html = start_new_chat(entity_id, entity_name, chat_tabs_state)
# load messages
player_id = current_state.get("player_id")
messages = get_chat_messages_for_entity(player_id, entity_id) if player_id else []
print(f"[HANDLE_START_CHAT] Created/switched to tab with key: '{entity_id}' and loaded {len(messages)} messages")
return chat_tabs_state, tabs_html, gr.update(value=messages, visible=True), gr.update(visible=True)
# ...existing generate_chat_tabs_html...
def generate_chat_tabs_html(chat_tabs_state: Dict) -> str:
"""Generate HTML for chat tabs display"""
if not chat_tabs_state:
return "<div style='text-align: center; color: #666; padding: 10px;'>No active chats</div>"
tabs_html = "<div style='display: flex; flex-wrap: wrap; gap: 5px; padding: 5px;'>"
for entity_id, tab_info in chat_tabs_state.items():
active_style = "background: #e3f2fd; border: 2px solid #2196f3;" if tab_info.get('active') else "background: #f5f5f5; border: 1px solid #ccc;"
pin_icon = "📌" if tab_info.get('pinned') else ""
unread_badge = f" ({tab_info.get('unread', 0)})" if tab_info.get('unread', 0) > 0 else ""
tabs_html += f"""
<div style='{active_style} padding: 8px 12px; border-radius: 8px; cursor: pointer; display: flex; align-items: center; gap: 5px;'>
<span style='flex: 1;' title='Entity ID: {entity_id}'>{tab_info['name']}{unread_badge}</span>
{pin_icon}
<span style='color: #f44336; font-weight: bold; margin-left: 5px; cursor: pointer;' title='Close tab'>×</span>
</div>
"""
tabs_html += "</div>"
return tabs_html
def switch_to_chat_tab(entity_id: str, chat_tabs_state: Dict, player_id: str) -> tuple:
"""Switch to a specific chat tab"""
if entity_id in chat_tabs_state:
# Deactivate all tabs
for tab_id in chat_tabs_state:
chat_tabs_state[tab_id]['active'] = False
# Activate selected tab
chat_tabs_state[entity_id]['active'] = True
chat_tabs_state[entity_id]['unread'] = 0 # Clear unread count
# Get messages for this entity
messages = get_chat_messages_for_entity(player_id, entity_id)
tabs_html = generate_chat_tabs_html(chat_tabs_state)
return chat_tabs_state, tabs_html, messages, True # Show chat input
return chat_tabs_state, generate_chat_tabs_html(chat_tabs_state), [], False
def get_proximity_status(current_state: Dict, preserve_selection: str = None):
"""Get proximity status and nearby entities"""
player_id = current_state.get("player_id")
print(f"[PROXIMITY_DEBUG] Called with player_id={player_id}, preserve_selection={preserve_selection}")
if not player_id:
print(f"[PROXIMITY_DEBUG] No player_id, returning empty")
return (
"<div style='text-align: center; color: #666;'>🔍 Join the game to see nearby entities</div>",
gr.update(visible=False),
gr.update(choices=[]),
[] )
# Get nearby entities
nearby_entities = game_world.check_npc_proximity(player_id)
print(f"[PROXIMITY_DEBUG] Found {len(nearby_entities)} nearby entities: {[e.get('name') for e in nearby_entities]}")
if not nearby_entities:
print(f"[PROXIMITY_DEBUG] No nearby entities, returning empty choices")
return (
"<div style='text-align: center; color: #666;'>🔍 Move near NPCs or players to chat privately</div>",
gr.update(visible=False),
gr.update(choices=[], value=None),
[]
)
# Format proximity info and create dropdown choices
entity_list = []
dropdown_choices = []
for entity in nearby_entities:
entity_id = entity['id']
entity_name = entity['name']
entity_type = entity['type']
if entity_type == 'npc':
entity_list.append(f"🤖 {entity_name} (NPC)")
else: # player
entity_list.append(f"👤 {entity_name} (Player)")
# Create dropdown choice as (label, value) tuple - Gradio format
# IMPORTANT: value must be entity_id, label is display name
display_label = f"{entity_name} ({entity_type.upper()})"
dropdown_choices.append((display_label, entity_id))
print(f"[PROXIMITY_DEBUG] Added dropdown choice: Label='{display_label}' -> Value='{entity_id}'")
print(f"[PROXIMITY_DEBUG] Created {len(dropdown_choices)} dropdown choices:")
for choice in dropdown_choices:
print(f"[PROXIMITY_DEBUG] - Label: '{choice[0]}', Value: '{choice[1]}'")
print(f"[PROXIMITY_DEBUG] When user selects from dropdown, handle_start_chat will receive the Value (entity_id), not the Label")
proximity_html = f"""
<div style='background: #e8f5e8; border: 1px solid #4caf50; padding: 10px; border-radius: 5px;'>
<div style='font-weight: bold; color: #2e7d32; margin-bottom: 5px;'>📱 Nearby for Private Chat:</div>
{'<br>'.join(entity_list)}
</div>
"""
# Get private messages for display
private_messages = game_world.get_private_messages_for_player(player_id)
formatted_private = []
for msg in private_messages:
formatted_private.append({
"role": "assistant",
"content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}"
})
# Determine dropdown value: preserve if still valid
dropdown_value = None
if preserve_selection:
valid_ids = [choice[0] for choice in dropdown_choices]
if preserve_selection in valid_ids:
dropdown_value = preserve_selection
print(f"[PROXIMITY_DEBUG] Preserving selection: {preserve_selection}")
else:
print(f"[PROXIMITY_DEBUG] Cannot preserve selection '{preserve_selection}' - not in valid_ids: {valid_ids}")
print(f"[PROXIMITY_DEBUG] Returning dropdown with value={dropdown_value}, choices={dropdown_choices}")
return (
proximity_html,
gr.update(visible=True),
gr.update(choices=dropdown_choices, value=dropdown_value),
formatted_private
)
def register_test_ai_agent(ai_name: str):
"""Register a test AI agent"""
if not ai_name.strip():
return "Please enter AI agent name"
try:
test_client_id = f"test_{uuid.uuid4().hex[:8]}"
agent_id = mcp_tools.register_ai_agent(ai_name.strip(), test_client_id)
return f"✅ AI agent '{ai_name}' registered with ID: {agent_id}\nClient ID: {test_client_id}"
except Exception as e:
return f"❌ Failed to register AI agent: {str(e)}"
def execute_ai_action(action: str, message: str):
"""Execute AI agent action"""
if not mcp_tools.ai_agents:
return "❌ No AI agents registered. Register an AI agent first!"
# Use the first registered AI agent for demo
client_id = list(mcp_tools.ai_agents.keys())[0]
if action.startswith("move"):
direction = action.split()[1]
result = mcp_tools.move_ai_agent(client_id, direction)
return f"🤖 Move result:\n{json.dumps(result, indent=2)}"
elif action == "chat":
if not message.strip():
return "❌ Please enter a message for chat"
result = mcp_tools.ai_agent_chat(client_id, message.strip())
return f"💬 Chat result:\n{json.dumps(result, indent=2)}"
return "❓ Unknown action"
def auto_refresh(current_state: Dict, current_dropdown: str, auto_refresh_enabled: bool):
"""Auto-refresh game displays"""
if not auto_refresh_enabled:
# Return current values without updating if auto-refresh is disabled
return (gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(), gr.update(), gr.update(), gr.update())
player_id = current_state.get("player_id") if current_state else None
# Update moving NPCs before rendering
game_world.update_moving_npcs()
world_html = create_game_world_html()
players_list = [
[p.name, p.type, p.level]
for p in game_world.players.values()
]
# Format chat
formatted_chat = []
for msg in game_world.chat_messages[-20:]:
formatted_chat.append({
"role": "assistant",
"content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}"
})
events = "\n".join([
f"{e.get('event', '')}"
for e in game_world.world_events[-5:]
]) if game_world.world_events else "No recent events"
# Update player stats
player_stats = get_player_stats_display(player_id)
# Update MCP info
mcp_status = {
"server_status": "🟢 Active",
"server_url": "This Gradio app serves as MCP server",
"tools_available": [
"register_ai_agent",
"move_agent",
"send_chat",
"get_game_state",
"interact_with_npc"
],
"active_ai_agents": len(mcp_tools.ai_agents),
"total_players": len(game_world.players) }
# Get proximity info for private chat - preserve dropdown selection during auto-refresh
proximity_html, private_chat_visible, entity_choices, _ = get_proximity_status(current_state, current_dropdown)
# Return gr.update() for private_chat_display to avoid overwriting it
return (world_html, players_list, formatted_chat, events, player_stats, mcp_status,
proximity_html, private_chat_visible, entity_choices, gr.update())
# Wire up event handlers
join_btn.click(
join_game,
inputs=[player_name, player_state],
outputs=[player_state, player_info, game_view, online_players, world_events,
proximity_info, private_chat_group, nearby_entities, current_chat_display]
)
leave_btn.click(
leave_game,
inputs=[player_state],
outputs=[player_state, player_info, game_view, online_players, world_events,
proximity_info, private_chat_group, nearby_entities, current_chat_display] )
# Movement buttons - FIXED
def move_up_handler(state):
return handle_movement("up", state)
def move_down_handler(state):
return handle_movement("down", state)
def move_left_handler(state):
return handle_movement("left", state)
def move_right_handler(state):
return handle_movement("right", state)
move_up.click(
move_up_handler,
inputs=[player_state],
outputs=[player_state, game_view, online_players, player_info,
proximity_info, private_chat_group, nearby_entities, current_chat_display]
)
move_down.click(
move_down_handler,
inputs=[player_state],
outputs=[player_state, game_view, online_players, player_info,
proximity_info, private_chat_group, nearby_entities, current_chat_display]
)
move_left.click(
move_left_handler,
inputs=[player_state],
outputs=[player_state, game_view, online_players, player_info,
proximity_info, private_chat_group, nearby_entities, current_chat_display]
)
move_right.click(
move_right_handler,
inputs=[player_state],
outputs=[player_state, game_view, online_players, player_info,
proximity_info, private_chat_group, nearby_entities, current_chat_display] )
# Chat
chat_send.click(
handle_chat,
inputs=[chat_input, chat_display, player_state],
outputs=[chat_display, chat_input]
)
chat_input.submit(
handle_chat,
inputs=[chat_input, chat_display, player_state],
outputs=[chat_display, chat_input] )
# Private Chat Handlers
private_send_btn.click(
handle_private_message,
inputs=[private_message_input, player_state, chat_tabs_state],
outputs=[current_chat_display, private_message_input]
)
private_message_input.submit(
handle_private_message,
inputs=[private_message_input, player_state, chat_tabs_state],
outputs=[current_chat_display, private_message_input]
)
# AI testing
register_ai_btn.click(
register_test_ai_agent,
inputs=[ai_name],
outputs=[ai_result]
)
execute_ai_btn.click(
execute_ai_action,
inputs=[ai_action, ai_message],
outputs=[ai_result]
)
# Multi-Chat Tab Event Handlers
start_chat_btn.click(
handle_start_chat,
inputs=[nearby_entities, chat_tabs_state, player_state],
outputs=[chat_tabs_state, active_tabs_display, current_chat_display, chat_input_row]
)
clear_all_tabs_btn.click(
clear_all_chat_tabs,
inputs=[chat_tabs_state],
outputs=[chat_tabs_state, active_tabs_display]
) # Auto-refresh
refresh_timer = gr.Timer(value=2) # Refresh every 2 seconds
refresh_timer.tick(
auto_refresh,
inputs=[player_state, nearby_entities, auto_refresh_enabled],
outputs=[game_view, online_players, chat_display, world_events, player_info, mcp_info,
proximity_info, private_chat_group, nearby_entities, current_chat_display]
)
return demo.queue()
# ============================================================================
# MAIN APPLICATION
# ============================================================================
if __name__ == "__main__":
print("🎮 Starting COMPLETE MMORPG with ALL FEATURES...")
print("\n🔧 Features Restored:")
print("✅ FIXED: Deadlock issue resolved with RLock")
print("✅ Human players can join and play")
print("✅ AI agents can connect via MCP API")
print("✅ Extensible NPC add-on system")
print("✅ MCP servers can be used as add-ons")
print("✅ Real-time multiplayer gameplay")
print("✅ Read2Burn secure mailbox add-on")
print("✅ Keyboard controls (WASD + Arrow Keys)")
print("✅ Complete MCP integration")
print("✅ All original features restored")
print("="*60)
app = create_mmorpg_interface()
# Launch with MCP API access print("\n🚀 Launching complete game server...")
print("🌐 Human players: Access via web interface")
print("🤖 AI agents: Connect via MCP API endpoints")
print("⌨️ Keyboard: Use WASD or Arrow Keys for movement")
print("🔌 MCP Tools available for AI integration")
print("🔥 Read2Burn mailbox for secure messaging")
print("🎯 All features working with deadlock fix!")
print("="*60)
app.launch(
server_name="127.0.0.1",
server_port=7868,
mcp_server=True,
share=True,
show_error=True,
debug=True
)