|
"""
|
|
Interface Manager Module
|
|
|
|
This module manages the complete user interface for the MMORPG application,
|
|
connecting the HuggingFace-style UI with the game services and handling all
|
|
user interactions and event bindings.
|
|
"""
|
|
|
|
import gradio as gr
|
|
import asyncio
|
|
import time
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
from dataclasses import asdict
|
|
|
|
from ..core.game_engine import GameEngine
|
|
from ..core.player import Player
|
|
from ..core.world import GameWorld
|
|
from ..facades.game_facade import GameFacade
|
|
from .huggingface_ui import HuggingFaceUI
|
|
|
|
|
|
class InterfaceManager:
|
|
"""Manages the complete user interface and event handling for the MMORPG."""
|
|
|
|
def __init__(self, game_facade: GameFacade, ui: HuggingFaceUI):
|
|
from ..core.game_engine import get_game_engine
|
|
self.game_engine = get_game_engine()
|
|
self.game_facade = game_facade
|
|
self.ui = ui
|
|
|
|
|
|
self.interface = None
|
|
|
|
|
|
self.auto_refresh_interval = 2.0
|
|
|
|
|
|
self.entity_name_to_id = {}
|
|
|
|
self.current_player_id = None
|
|
|
|
def create_interface(self) -> gr.Blocks:
|
|
"""Create and configure the complete interface with event handlers."""
|
|
|
|
|
|
self._create_interface_with_events()
|
|
|
|
return self.interface
|
|
|
|
def _create_interface_with_events(self) -> None:
|
|
"""Create the interface with all event handlers properly wired."""
|
|
|
|
|
|
keyboard_js = self.ui.get_keyboard_script()
|
|
|
|
with gr.Blocks(
|
|
title="🎮 MMORPG with MCP Integration",
|
|
theme=self.ui.theme,
|
|
css=self.ui.custom_css,
|
|
head=keyboard_js
|
|
) as interface:
|
|
|
|
|
|
self.ui.create_hero_section()
|
|
|
|
|
|
keyboard_status = self.ui.create_keyboard_status()
|
|
|
|
|
|
player_state = gr.State({})
|
|
|
|
with gr.Tabs():
|
|
|
|
with gr.Tab("🌍 Game World"):
|
|
with gr.Row():
|
|
with gr.Column(scale=2):
|
|
|
|
with gr.Group():
|
|
gr.Markdown("### 🎮 Join the Adventure")
|
|
with gr.Row():
|
|
player_name = gr.Textbox(
|
|
label="Player Name",
|
|
placeholder="Enter your character name",
|
|
scale=3
|
|
)
|
|
join_btn = gr.Button("Join Game", variant="primary", scale=1)
|
|
leave_btn = gr.Button("Leave Game", variant="secondary", scale=1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
game_view = gr.HTML(
|
|
value=self.ui._generate_world_html(800, 600),
|
|
label="🌍 Game World"
|
|
)
|
|
|
|
with gr.Group():
|
|
gr.Markdown("### 🕹️ Movement Controls")
|
|
gr.Markdown("**Keyboard:** WASD or Arrow Keys | **Mouse:** Click buttons below")
|
|
|
|
with gr.Row():
|
|
gr.HTML("")
|
|
move_up = gr.Button("↑", size="lg")
|
|
gr.HTML("")
|
|
with gr.Row():
|
|
move_left = gr.Button("←", size="lg")
|
|
action_btn = gr.Button("⚔️", size="lg", variant="secondary")
|
|
move_right = gr.Button("→", size="lg")
|
|
with gr.Row():
|
|
gr.HTML("")
|
|
move_down = gr.Button("↓", size="lg")
|
|
gr.HTML("")
|
|
|
|
with gr.Column(scale=1):
|
|
|
|
player_info = gr.JSON(
|
|
label="🧝♂️ Player Stats",
|
|
value={"status": "Not connected", "info": "Join the game to see your stats"}
|
|
)
|
|
|
|
|
|
online_players = gr.Dataframe(
|
|
headers=["Name", "Type", "Level"],
|
|
label="👥 Online Players" )
|
|
|
|
|
|
world_events = gr.Textbox(
|
|
label="🌍 World Events & NPC Interactions",
|
|
lines=4,
|
|
interactive=False,
|
|
placeholder="World events will appear here...<br><br>💡 Tip: Walk near NPCs (📮🏪🌤️) to interact with them!<br>Then visit the 'NPC Add-ons' tab to use their features."
|
|
)
|
|
|
|
with gr.Row():
|
|
with gr.Column(scale=4):
|
|
chat_display = gr.Chatbot(
|
|
label="💬 Game Chat",
|
|
height=300,
|
|
type='messages',
|
|
value=[{"role": "assistant", "content": "Welcome! Join the game to start chatting!"}]
|
|
)
|
|
with gr.Row():
|
|
chat_input = gr.Textbox(
|
|
placeholder="Type your message... (use /help for commands)",
|
|
scale=4,
|
|
container=False
|
|
)
|
|
chat_send = gr.Button("Send", scale=1, variant="primary")
|
|
|
|
|
|
with gr.Column(scale=2):
|
|
|
|
with gr.Row():
|
|
auto_refresh_enabled = gr.Checkbox(
|
|
label="Auto-refresh (2s)",
|
|
value=True,
|
|
info="Toggle to preserve manual selections"
|
|
)
|
|
|
|
proximity_info = gr.HTML(
|
|
value="<div style='text-align: center; color: #666;'>🔍 Move near NPCs or players to chat privately</div>",
|
|
label="📱 Nearby Entities"
|
|
)
|
|
|
|
|
|
with gr.Group(visible=False) as private_chat_group:
|
|
nearby_entities = gr.Dropdown(
|
|
label="💬 Start new chat with",
|
|
choices=[],
|
|
interactive=True
|
|
)
|
|
|
|
with gr.Row():
|
|
start_chat_btn = gr.Button("Start Chat", variant="primary", scale=1)
|
|
clear_all_tabs_btn = gr.Button("Clear All", variant="secondary", scale=1)
|
|
|
|
|
|
chat_tabs_state = gr.State({})
|
|
active_tabs_display = gr.HTML(
|
|
value="<div style='text-align: center; color: #666; padding: 10px;'>No active chats</div>",
|
|
label="Active Chats"
|
|
)
|
|
|
|
|
|
current_chat_display = gr.Chatbot(
|
|
label="🔒 Private Messages",
|
|
height=200,
|
|
type='messages',
|
|
value=[],
|
|
visible=False
|
|
)
|
|
|
|
with gr.Row(visible=False) as chat_input_row:
|
|
private_message_input = gr.Textbox(
|
|
placeholder="Type private message...",
|
|
scale=4,
|
|
container=False
|
|
)
|
|
private_send_btn = gr.Button("Send", scale=1, variant="secondary")
|
|
|
|
with gr.Tab("📚 Documentation"):
|
|
self.ui._create_documentation_content()
|
|
|
|
|
|
with gr.Tab("🤖 NPC Add-ons"):
|
|
self.ui._create_npc_addons_content()
|
|
|
|
|
|
with gr.Tab("🔌 Plugin System"):
|
|
self.ui._create_plugin_system_content()
|
|
|
|
with gr.Tab("🔗 MCP Integration"):
|
|
self.ui._create_mcp_integration_content()
|
|
|
|
|
|
with gr.Tab("🧪 AI Agent Testing"):
|
|
gr.Markdown("## 🧪 Test AI Agent Integration")
|
|
gr.Markdown("*Use this to simulate AI agent connections for testing*")
|
|
|
|
with gr.Row():
|
|
ai_name = gr.Textbox(
|
|
label="AI Agent Name",
|
|
placeholder="Claude the Explorer",
|
|
scale=3
|
|
)
|
|
register_ai_btn = gr.Button("Register AI Agent", variant="primary", scale=1)
|
|
|
|
with gr.Row():
|
|
ai_action = gr.Dropdown(
|
|
choices=["move up", "move down", "move left", "move right", "chat"],
|
|
label="AI Action",
|
|
scale=2
|
|
)
|
|
ai_message = gr.Textbox(
|
|
label="AI Message (for chat)",
|
|
placeholder="Hello humans!",
|
|
scale=3
|
|
)
|
|
execute_ai_btn = gr.Button("Execute AI Action", variant="secondary")
|
|
ai_result = gr.Textbox(label="AI Action Result", interactive=False, lines=5)
|
|
|
|
|
|
gr.Markdown("### 🧪 Test Raw Data Access")
|
|
gr.Markdown("*Test accessing comprehensive world data without HTML parsing*")
|
|
|
|
with gr.Row():
|
|
test_raw_data_btn = gr.Button("Get Raw World Data", variant="secondary")
|
|
test_available_npcs_btn = gr.Button("Get Available NPCs", variant="secondary")
|
|
|
|
raw_data_result = gr.JSON(label="Raw Data Output", visible=True)
|
|
|
|
|
|
with gr.Tab("🏗️ Architecture"):
|
|
self.ui._create_architecture_content()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
join_btn.click(
|
|
self._handle_join_game,
|
|
inputs=[player_name, player_state],
|
|
outputs=[player_state, player_info, online_players, game_view]
|
|
)
|
|
|
|
|
|
leave_btn.click(
|
|
self._handle_leave_game,
|
|
inputs=[player_state],
|
|
outputs=[player_state, player_info, online_players, game_view]
|
|
)
|
|
|
|
|
|
move_up.click(
|
|
self._handle_movement,
|
|
inputs=[gr.State("up"), player_state],
|
|
outputs=[player_state, game_view, world_events]
|
|
)
|
|
|
|
move_down.click(
|
|
self._handle_movement,
|
|
inputs=[gr.State("down"), player_state],
|
|
outputs=[player_state, game_view, world_events]
|
|
)
|
|
|
|
move_left.click(
|
|
self._handle_movement,
|
|
inputs=[gr.State("left"), player_state],
|
|
outputs=[player_state, game_view, world_events]
|
|
)
|
|
|
|
move_right.click(
|
|
self._handle_movement,
|
|
inputs=[gr.State("right"), player_state],
|
|
outputs=[player_state, game_view, world_events]
|
|
)
|
|
|
|
action_btn.click(
|
|
self._handle_action,
|
|
inputs=[player_state],
|
|
outputs=[world_events]
|
|
)
|
|
|
|
|
|
chat_send.click(
|
|
self._handle_chat_message,
|
|
inputs=[chat_input, player_state],
|
|
outputs=[chat_display, chat_input]
|
|
)
|
|
|
|
chat_input.submit(
|
|
self._handle_chat_message,
|
|
inputs=[chat_input, player_state],
|
|
outputs=[chat_display, chat_input]
|
|
)
|
|
|
|
|
|
start_chat_btn.click(
|
|
self._handle_start_chat,
|
|
inputs=[nearby_entities, chat_tabs_state, player_state],
|
|
outputs=[chat_tabs_state, active_tabs_display, current_chat_display, chat_input_row]
|
|
)
|
|
|
|
private_send_btn.click(
|
|
self._handle_private_message,
|
|
inputs=[private_message_input, chat_tabs_state, player_state],
|
|
outputs=[current_chat_display, private_message_input]
|
|
)
|
|
|
|
clear_all_tabs_btn.click(
|
|
self._handle_clear_all_chats,
|
|
inputs=[chat_tabs_state],
|
|
outputs=[chat_tabs_state, active_tabs_display, current_chat_display, chat_input_row]
|
|
)
|
|
|
|
|
|
register_ai_btn.click(
|
|
self._handle_register_ai_agent,
|
|
inputs=[ai_name],
|
|
outputs=[ai_result]
|
|
)
|
|
execute_ai_btn.click(
|
|
self._handle_execute_ai_action,
|
|
inputs=[ai_action, ai_message],
|
|
outputs=[ai_result]
|
|
)
|
|
|
|
|
|
test_raw_data_btn.click(
|
|
self._handle_test_raw_world_data,
|
|
inputs=[],
|
|
outputs=[raw_data_result]
|
|
)
|
|
|
|
test_available_npcs_btn.click(
|
|
self._handle_test_available_npcs,
|
|
inputs=[],
|
|
outputs=[raw_data_result]
|
|
)
|
|
|
|
|
|
refresh_timer = gr.Timer(value=self.auto_refresh_interval)
|
|
refresh_timer.tick(
|
|
self._auto_refresh_game_state,
|
|
inputs=[player_state, auto_refresh_enabled],
|
|
outputs=[player_info, online_players, game_view, proximity_info, nearby_entities, private_chat_group, chat_display, world_events]
|
|
)
|
|
|
|
self.interface = interface
|
|
|
|
|
|
|
|
|
|
def _handle_join_game(self, name: str, current_state: Dict) -> Tuple[Dict, Dict, List[List[str]], str]:
|
|
"""Handle player joining the game."""
|
|
|
|
if not name or not name.strip():
|
|
return current_state, {"error": "Please enter a valid name"}, [], self.ui._generate_world_html(800, 600)
|
|
|
|
|
|
player_id = self.game_facade.join_game(name.strip())
|
|
if not player_id:
|
|
return current_state, {"error": "Failed to join game"}, [], self.ui._generate_world_html(800, 600)
|
|
|
|
|
|
current_state.update({"player_id": player_id, "joined": True, "name": name.strip()})
|
|
|
|
|
|
self.current_player_id = player_id
|
|
|
|
|
|
status = {"player_id": player_id, "status": f"Joined as {name.strip()}"}
|
|
rows = self._get_online_players_data()
|
|
world_html = self._generate_world_html_with_players()
|
|
return current_state, status, rows, world_html
|
|
|
|
def _handle_leave_game(self, current_state: Dict) -> Tuple[Dict, Dict, List[List[str]], str]:
|
|
"""Handle player leaving the game: clear state and return updated UI components."""
|
|
|
|
player_id = current_state.get("player_id")
|
|
success = self.game_facade.leave_game(player_id)
|
|
|
|
new_state: Dict = {}
|
|
self.current_player_id = None
|
|
status = {"status": "Left game" if success else "Leave failed"}
|
|
rows = self._get_online_players_data()
|
|
world_html = self._generate_world_html_with_players()
|
|
return new_state, status, rows, world_html
|
|
|
|
def _handle_movement(self, direction: str, current_state: Dict) -> Tuple[Dict, str, str]:
|
|
"""Handle player movement with enhanced collision detection."""
|
|
if not current_state.get("joined"):
|
|
return current_state, self._generate_world_html_with_players(), "Join the game to move!"
|
|
|
|
try:
|
|
player_id = current_state.get("player_id")
|
|
success, new_position, events = self.game_facade.move_player(player_id, direction)
|
|
|
|
if success:
|
|
current_state.update(new_position)
|
|
world_html = self._generate_world_html_with_players()
|
|
event_text = "<br>".join(events) if events else f"Moved {direction}"
|
|
return current_state, world_html, event_text
|
|
else:
|
|
return current_state, self._generate_world_html_with_players(), f"Cannot move {direction} - blocked by obstacle!"
|
|
|
|
except Exception as e:
|
|
return current_state, self._generate_world_html_with_players(), f"Movement error: {str(e)}"
|
|
|
|
def _handle_action(self, current_state: Dict) -> str:
|
|
"""Handle special action button."""
|
|
if not current_state.get("joined"):
|
|
return "Join the game to use actions!"
|
|
|
|
try:
|
|
player_id = current_state.get("player_id")
|
|
result = self.game_facade.handle_action(player_id)
|
|
return result
|
|
|
|
except Exception as e:
|
|
return f"Action error: {str(e)}"
|
|
|
|
def _handle_chat_message(self, message: str, current_state: Dict) -> Tuple[List[Dict], str]:
|
|
"""Handle public chat messages with state context."""
|
|
|
|
if not message or not message.strip() or not current_state.get("joined"):
|
|
return [], ""
|
|
try:
|
|
player_id = current_state.get("player_id")
|
|
|
|
self.game_facade.send_chat_message(player_id, message.strip())
|
|
|
|
history = self.game_facade.get_chat_history(20)
|
|
formatted = self._format_chat_messages(history)
|
|
|
|
return formatted, ""
|
|
except Exception:
|
|
return [{"role": "assistant", "content": "Chat error"}], ""
|
|
|
|
def _handle_start_chat(self, entity_selection: str, chat_tabs_state: Dict, current_state: Dict) -> Tuple[Dict, str, gr.Chatbot, gr.Group]:
|
|
"""Handle starting a private chat."""
|
|
if not entity_selection or not current_state.get("joined"):
|
|
return chat_tabs_state, gr.update(value=self._generate_chat_tabs_html(chat_tabs_state)), gr.update(visible=False), gr.update(visible=False)
|
|
|
|
try:
|
|
|
|
entity_id = self.entity_name_to_id.get(entity_selection, entity_selection)
|
|
entity_name = entity_selection
|
|
|
|
if entity_id not in chat_tabs_state:
|
|
chat_tabs_state[entity_id] = {
|
|
"name": entity_name,
|
|
"active": True,
|
|
"pinned": False,
|
|
"unread": 0 }
|
|
|
|
|
|
for tab_id in chat_tabs_state:
|
|
chat_tabs_state[tab_id]["active"] = (tab_id == entity_id)
|
|
|
|
player_id = current_state.get("player_id")
|
|
messages = self.game_facade.get_private_messages(player_id, entity_id)
|
|
tabs_html = self._generate_chat_tabs_html(chat_tabs_state)
|
|
return chat_tabs_state, gr.update(value=tabs_html), gr.update(value=messages, visible=True), gr.update(visible=True)
|
|
|
|
except Exception as e:
|
|
return chat_tabs_state, gr.update(value=f"Error: {str(e)}"), gr.update(visible=False), gr.update(visible=False)
|
|
|
|
def _handle_private_message(self, message: str, chat_tabs_state: Dict, current_state: Dict) -> Tuple[List[Dict], str]:
|
|
"""Handle private chat messages."""
|
|
|
|
if not message or not message.strip() or not current_state.get("joined"):
|
|
return [], ""
|
|
|
|
|
|
entity_id = next((eid for eid, info in chat_tabs_state.items() if info.get("active")), None)
|
|
if not entity_id:
|
|
return [], ""
|
|
|
|
print(f"[InterfaceManager] Sending private message from {current_state.get('player_id')} to {entity_id}: '{message.strip()}'")
|
|
|
|
|
|
player_id = current_state.get("player_id")
|
|
player_name = current_state.get("name", "Unknown")
|
|
success = self.game_facade.send_private_message(player_id, player_name, entity_id, message.strip())
|
|
|
|
if success:
|
|
|
|
messages = self.game_facade.get_private_messages(player_id, entity_id)
|
|
|
|
formatted = []
|
|
for msg in messages[-20:]:
|
|
|
|
role = "user" if msg.get("sender_id") == player_id else "assistant"
|
|
|
|
message_content = msg.get('message', '').replace('\n', '<br>')
|
|
content = f"[{msg.get('timestamp', '')}] {msg.get('sender', 'Unknown')}: {message_content}"
|
|
formatted.append({"role": role, "content": content})
|
|
|
|
print(f"[InterfaceManager] Formatted {len(formatted)} private messages for display")
|
|
return formatted, ""
|
|
else:
|
|
print(f"[InterfaceManager] Failed to send private message")
|
|
return [], ""
|
|
|
|
def _handle_clear_all_chats(self, chat_tabs_state: Dict) -> Tuple[Dict, str, gr.Chatbot, gr.Group]:
|
|
"""Handle clearing all private chat tabs."""
|
|
chat_tabs_state.clear()
|
|
tabs_html = self._generate_chat_tabs_html(chat_tabs_state)
|
|
return chat_tabs_state, tabs_html, gr.update(visible=False), gr.update(visible=False)
|
|
|
|
def _handle_register_ai_agent(self, ai_name: str) -> str:
|
|
"""Handle AI agent registration."""
|
|
if not ai_name or not ai_name.strip():
|
|
return "❌ Please enter a valid AI agent name"
|
|
|
|
try:
|
|
|
|
import uuid
|
|
agent_id = f"test_agent_{uuid.uuid4().hex[:8]}"
|
|
|
|
|
|
result = self.game_facade.register_ai_agent(agent_id, ai_name.strip())
|
|
|
|
if result:
|
|
|
|
if not hasattr(self, 'registered_agents'):
|
|
self.registered_agents = {}
|
|
self.registered_agents[agent_id] = ai_name.strip()
|
|
|
|
return f"✅ AI agent '{ai_name.strip()}' registered successfully!\nAgent ID: {agent_id}"
|
|
else:
|
|
return f"❌ Failed to register AI agent '{ai_name.strip()}'"
|
|
|
|
except Exception as e:
|
|
return f"❌ Error registering AI agent: {str(e)}"
|
|
|
|
def _handle_execute_ai_action(self, action: str, message: str) -> str:
|
|
"""Handle AI agent action execution."""
|
|
if not hasattr(self, 'registered_agents') or not self.registered_agents:
|
|
return "❌ No AI agents registered. Please register an AI agent first!"
|
|
|
|
if not action:
|
|
return "❌ Please select an action to execute"
|
|
|
|
try:
|
|
|
|
agent_id = list(self.registered_agents.keys())[0]
|
|
agent_name = self.registered_agents[agent_id]
|
|
|
|
if action.startswith("move"):
|
|
|
|
direction = action.split()[1] if len(action.split()) > 1 else action
|
|
result = self.game_facade.move_ai_agent(agent_id, direction)
|
|
|
|
if result.get("success"):
|
|
position = result.get("position", {})
|
|
return f"🤖 {agent_name} moved {direction} successfully!\nNew position: ({position.get('x', '?')}, {position.get('y', '?')})"
|
|
else:
|
|
error_msg = result.get("error", "Movement failed")
|
|
return f"❌ {agent_name} movement failed: {error_msg}"
|
|
|
|
elif action == "chat":
|
|
if not message or not message.strip():
|
|
return "❌ Please enter a chat message"
|
|
|
|
result = self.game_facade.ai_agent_chat(agent_id, message.strip())
|
|
|
|
if result.get("success"):
|
|
return f"💬 {agent_name} sent chat message: '{message.strip()}'"
|
|
else:
|
|
error_msg = result.get("error", "Chat failed")
|
|
return f"❌ {agent_name} chat failed: {error_msg}"
|
|
else:
|
|
return f"❓ Unknown action: {action}"
|
|
except Exception as e:
|
|
return f"❌ Error executing AI action: {str(e)}"
|
|
|
|
def _handle_test_raw_world_data(self) -> Dict:
|
|
"""Handle testing raw world data access."""
|
|
try:
|
|
raw_data = self.game_facade.get_raw_world_data()
|
|
return raw_data
|
|
except Exception as e:
|
|
return {"error": f"Failed to get raw world data: {str(e)}"}
|
|
|
|
def _handle_test_available_npcs(self) -> List[Dict]:
|
|
"""Handle testing available NPCs data access."""
|
|
try:
|
|
npcs_data = self.game_facade.get_available_npcs()
|
|
return npcs_data
|
|
except Exception as e:
|
|
return [{"error": f"Failed to get NPCs data: {str(e)}"}]
|
|
|
|
def _auto_refresh_game_state(self, current_state: Dict, auto_refresh: bool):
|
|
"""Auto-refresh game state and UI elements with update objects."""
|
|
if not auto_refresh or not current_state.get("joined"):
|
|
return (
|
|
gr.update(),
|
|
gr.update(),
|
|
gr.update(),
|
|
gr.update(),
|
|
gr.update(),
|
|
gr.update(visible=False),
|
|
gr.update(),
|
|
gr.update(),
|
|
)
|
|
try:
|
|
|
|
player_id = current_state.get("player_id")
|
|
|
|
player_info_data = self.game_facade.get_player_stats(player_id)
|
|
online_players_data = self._get_online_players_data()
|
|
world_html = self._generate_world_html_with_players()
|
|
proximity_html, nearby_choices, show_private_chat = self._get_proximity_info(player_id)
|
|
|
|
chat_history = self.game_facade.get_chat_history(20)
|
|
formatted_chat = self._format_chat_messages(chat_history)
|
|
|
|
|
|
world_events_data = self._get_formatted_world_events()
|
|
|
|
|
|
return (
|
|
gr.update(value=player_info_data),
|
|
gr.update(value=online_players_data),
|
|
gr.update(value=world_html),
|
|
gr.update(value=proximity_html),
|
|
gr.update(choices=nearby_choices),
|
|
gr.update(visible=show_private_chat),
|
|
gr.update(value=formatted_chat),
|
|
gr.update(value=world_events_data),
|
|
)
|
|
except Exception as e:
|
|
print(f"Auto-refresh error: {e}")
|
|
return (
|
|
gr.update(),
|
|
gr.update(),
|
|
gr.update(),
|
|
gr.update(),
|
|
gr.update(),
|
|
gr.update(visible=False),
|
|
gr.update(),
|
|
gr.update(),
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def _get_online_players_data(self) -> List[List[str]]:
|
|
"""Get formatted online players data."""
|
|
try:
|
|
players_data = []
|
|
for player in self.game_facade.get_all_players().values():
|
|
players_data.append([
|
|
player.name,
|
|
"🤖 AI" if player.type == "ai_agent" else "👤 Human",
|
|
str(player.level)
|
|
])
|
|
return players_data
|
|
except Exception:
|
|
return []
|
|
|
|
def _generate_world_html_with_players(self) -> str:
|
|
"""Generate world HTML with current player positions, glow effects, and player names."""
|
|
try:
|
|
|
|
base_html = self.ui._generate_world_html(800, 600)
|
|
|
|
|
|
current_player_id = getattr(self, 'current_player_id', None)
|
|
|
|
|
|
players_html = ""
|
|
all_players = self.game_facade.get_all_players()
|
|
|
|
for player in all_players.values():
|
|
player_icon = "🤖" if player.type == "ai_agent" else "🧑🦰"
|
|
is_current = player.id == current_player_id
|
|
|
|
|
|
border_style = "border: 2px solid yellow; border-radius: 50%;" if is_current else ""
|
|
|
|
|
|
players_html += f'''
|
|
<div style="position: absolute; left: {player.x}px; top: {player.y}px;
|
|
font-size: 20px; z-index: 10; {border_style}
|
|
text-shadow: 2px 2px 4px rgba(0,0,0,0.5);"
|
|
title="{player.name} (Level {player.level})">
|
|
{player_icon}
|
|
</div>
|
|
'''
|
|
|
|
|
|
name_background = "rgba(255,215,0,0.9)" if is_current else "rgba(255,215,0,0.6)"
|
|
players_html += f'''
|
|
<div style="position: absolute; left: {player.x - 15}px; top: {player.y - 15}px;
|
|
background: {name_background}; color: black; padding: 1px 4px;
|
|
border-radius: 3px; font-size: 8px; font-weight: bold; z-index: 11;">
|
|
{player.name} (Lv.{player.level})
|
|
</div>
|
|
'''
|
|
|
|
|
|
import time
|
|
player_count = len(all_players)
|
|
current_time = time.strftime('%H:%M:%S')
|
|
|
|
|
|
width, height = 800, 600
|
|
|
|
status_html = f'''
|
|
<!--<div style="position: absolute; top: 10px; left: 50%; transform: translateX(-50%);
|
|
text-align: center; background: rgba(0,0,0,0.1); padding: 10px;
|
|
border-radius: 5px; font-family: Arial, sans-serif; z-index: 12;">
|
|
-->
|
|
<div style="position: absolute; top: 10px; left: 10px; background: rgba(255,255,255,0.9);
|
|
padding: 10px; border-radius: 8px; font-size: 12px; box-shadow: 0 2px 5px rgba(0,0,0,0.2);">
|
|
🌍 <strong>Fantasy Realm</strong> |
|
|
Players: {player_count}/20 |
|
|
Last Update: {current_time}
|
|
<br>
|
|
<!-- 🎮 <strong>Use WASD or Arrow Keys to move, Space for Action</strong>
|
|
--> <br>
|
|
🧑🦰 Players | 📮🏪🚶🧙🌤️ NPCs | 🌳 Trees
|
|
</div>
|
|
'''
|
|
|
|
|
|
enhanced_content = players_html + status_html
|
|
if enhanced_content:
|
|
base_html = base_html.replace(
|
|
'<div id="players-container"></div>',
|
|
f'<div id="players-container">{enhanced_content}</div>'
|
|
)
|
|
|
|
return base_html
|
|
|
|
except Exception as e:
|
|
print(f"Error generating world HTML: {e}")
|
|
return self.ui._generate_world_html(800, 600)
|
|
|
|
def _format_chat_messages(self, messages: List[Dict]) -> List[Dict]:
|
|
"""Format chat messages for Gradio chatbot."""
|
|
formatted = []
|
|
for msg in messages[-20:]:
|
|
role = "assistant" if msg.get("sender") == "System" else "user"
|
|
|
|
message_content = msg.get('message', '').replace('\n', '<br>')
|
|
content = f"[{msg.get('timestamp', '')}] {msg.get('sender', 'Unknown')}: {message_content}"
|
|
formatted.append({"role": role, "content": content})
|
|
return formatted
|
|
|
|
def _generate_chat_tabs_html(self, chat_tabs_state: Dict) -> str:
|
|
"""Generate HTML for chat tabs display."""
|
|
if not chat_tabs_state:
|
|
return "<div style='text-align: center; color: #666; padding: 10px;'>No active chats</div>"
|
|
|
|
tabs_html = "<div style='display: flex; flex-wrap: wrap; gap: 5px; padding: 5px;'>"
|
|
for entity_id, tab_info in chat_tabs_state.items():
|
|
active_class = "active" if tab_info.get('active') else ""
|
|
pin_icon = "📌" if tab_info.get('pinned') else ""
|
|
unread_badge = f" ({tab_info.get('unread', 0)})" if tab_info.get('unread', 0) > 0 else ""
|
|
|
|
tabs_html += f'''
|
|
<div class="chat-tab {active_class}"
|
|
style="display: flex; align-items: center; gap: 5px;"
|
|
title="Entity ID: {entity_id}">
|
|
<span>{tab_info['name']}{unread_badge}</span>
|
|
{pin_icon}
|
|
<span style="color: #f44336; font-weight: bold; cursor: pointer;" title="Close tab">×</span>
|
|
</div>
|
|
'''
|
|
tabs_html += "</div>"
|
|
return tabs_html
|
|
|
|
def _get_entity_name(self, entity_id: str) -> str:
|
|
"""Get display name for an entity."""
|
|
try:
|
|
world = self.game_engine.get_world()
|
|
|
|
|
|
npc = world.npcs.get(entity_id)
|
|
if npc:
|
|
return npc.get("name", entity_id)
|
|
|
|
|
|
player = world.players.get(entity_id)
|
|
if player:
|
|
return player.name
|
|
|
|
return entity_id
|
|
except Exception:
|
|
return entity_id
|
|
|
|
def _get_proximity_info(self, player_id: str) -> Tuple[str, List[str], bool]:
|
|
"""Get proximity information and nearby entities."""
|
|
try:
|
|
proximity_data = self.game_facade.get_proximity_info(player_id)
|
|
|
|
nearby_entities = proximity_data.get("nearby_entities", [])
|
|
if nearby_entities:
|
|
proximity_html = f"<div style='color: #4caf50;'>📡 Found {len(nearby_entities)} nearby entities</div>"
|
|
choices = []
|
|
self.entity_name_to_id = {}
|
|
for entity in nearby_entities:
|
|
entity_id = entity['id']
|
|
entity_name = self._get_entity_name(entity_id)
|
|
|
|
self.entity_name_to_id[entity_name] = entity_id
|
|
choices.append(entity_name)
|
|
|
|
return proximity_html, choices, True
|
|
else:
|
|
proximity_html = "<div style='color: #666;'>🔍 Move near NPCs or players to chat privately</div>"
|
|
return proximity_html, [], False
|
|
|
|
except Exception as e:
|
|
return f"<div style='color: #f44336;'>Error: {str(e)}</div>", [], False
|
|
|
|
def _get_formatted_world_events(self) -> str:
|
|
"""Get formatted world events for display."""
|
|
try:
|
|
|
|
world_events = self.game_facade.get_world_events()
|
|
if not world_events:
|
|
return "No recent world events...\n\n💡 Tip: Walk near NPCs (📮🏪🌤️) to interact with them!\nThen visit the 'NPC Add-ons' tab to use their features."
|
|
|
|
|
|
formatted_events = []
|
|
for event in world_events[-10:]:
|
|
timestamp = event.get('timestamp', '')
|
|
event_text = event.get('event', '')
|
|
formatted_events.append(f"[{timestamp}] {event_text}")
|
|
|
|
return "\n".join(formatted_events)
|
|
|
|
except Exception as e:
|
|
print(f"Error formatting world events: {e}")
|
|
return "Error loading world events..."
|
|
|