Chris4K's picture
Update app.py
6f37a99 verified
import gradio as gr
import asyncio
import threading
from typing import Optional
# Import our clean architecture components
from src.core.game_engine import GameEngine
from src.facades.game_facade import GameFacade
from src.ui.huggingface_ui import HuggingFaceUI
from src.ui.improved_interface_manager import ImprovedInterfaceManager
from src.mcp.mcp_tools import GradioMCPTools
class MMORPGApplication:
"""Main application class that orchestrates the MMORPG game."""
def __init__(self):
"""Initialize the application with all necessary components."""
# Initialize core game engine (singleton)
self.game_engine = GameEngine()
# Initialize game facade for simplified operations
self.game_facade = GameFacade() # Initialize UI components
self.ui = HuggingFaceUI(self.game_facade)
self.interface_manager = ImprovedInterfaceManager(self.game_facade, self.ui)
# Initialize MCP tools for AI agent integration
self.mcp_tools = GradioMCPTools(self.game_facade)
# Gradio interface reference
self.gradio_interface: Optional[gr.Blocks] = None
# Background tasks
self._cleanup_task = None
self._auto_refresh_task = None
def create_gradio_interface(self) -> gr.Blocks:
"""Create and configure the Gradio interface."""
# Use the ImprovedInterfaceManager to create the complete interface with proper event handling
self.gradio_interface = self.interface_manager.create_interface()
return self.gradio_interface
def start_background_tasks(self):
"""Start background tasks for game maintenance."""
def cleanup_task():
"""Background task for cleaning up inactive players."""
while True:
try:
self.game_facade.cleanup_inactive_players()
threading.Event().wait(30) # Wait 30 seconds
except Exception as e:
print(f"Error in cleanup task: {e}")
threading.Event().wait(5) # Wait before retry
def auto_refresh_task():
"""Background task for auto-refreshing game state."""
while True:
try:
# Trigger refresh for active sessions
# This would need session tracking for real implementation
threading.Event().wait(2) # Refresh every 2 seconds
except Exception as e:
print(f"Error in auto-refresh task: {e}")
threading.Event().wait(5) # Wait before retry
# Start cleanup task
self._cleanup_task = threading.Thread(target=cleanup_task, daemon=True)
self._cleanup_task.start()
# Start auto-refresh task
self._auto_refresh_task = threading.Thread(target=auto_refresh_task, daemon=True)
self._auto_refresh_task.start()
def run(self, share: bool = False, server_port: int = 7860):
"""Run the MMORPG application."""
print("🎮 Starting MMORPG Application...")
print("🏗️ Initializing game engine...")
# Initialize game world and services
if not self.game_engine.start():
print("❌ Failed to start game engine")
return
print("🎨 Creating user interface...")
# Create Gradio interface
interface = self.create_gradio_interface()
print("🔧 Starting background tasks...")
# Start background maintenance tasks
self.start_background_tasks()
print("🚀 Launching server...")
print(f"🌐 Server will be available at: http://localhost:{server_port}")
if share:
print("🔗 Public URL will be generated...")
# Launch the interface
interface.launch(
share=True, #override
debug=True,
# server_port=server_port,
mcp_server=True, # Enable MCP server integration
show_error=True,
quiet=False
)
def main():
"""Main entry point for the application."""
# Create and run the application
app = MMORPGApplication()
# Run with default settings
# Change share=True to make it publicly accessible
app.run(share=True, server_port=7869)
if __name__ == "__main__":
main()