|
import gradio as gr |
|
import asyncio |
|
import threading |
|
from typing import Optional |
|
|
|
|
|
from src.core.game_engine import GameEngine |
|
from src.facades.game_facade import GameFacade |
|
from src.ui.huggingface_ui import HuggingFaceUI |
|
from src.ui.improved_interface_manager import ImprovedInterfaceManager |
|
from src.mcp.mcp_tools import GradioMCPTools |
|
|
|
class MMORPGApplication: |
|
"""Main application class that orchestrates the MMORPG game.""" |
|
|
|
def __init__(self): |
|
"""Initialize the application with all necessary components.""" |
|
|
|
self.game_engine = GameEngine() |
|
|
|
|
|
self.game_facade = GameFacade() |
|
self.ui = HuggingFaceUI(self.game_facade) |
|
self.interface_manager = ImprovedInterfaceManager(self.game_facade, self.ui) |
|
|
|
|
|
self.mcp_tools = GradioMCPTools(self.game_facade) |
|
|
|
self.gradio_interface: Optional[gr.Blocks] = None |
|
|
|
|
|
self._cleanup_task = None |
|
self._auto_refresh_task = None |
|
|
|
def create_gradio_interface(self) -> gr.Blocks: |
|
"""Create and configure the Gradio interface.""" |
|
|
|
self.gradio_interface = self.interface_manager.create_interface() |
|
return self.gradio_interface |
|
|
|
def start_background_tasks(self): |
|
"""Start background tasks for game maintenance.""" |
|
|
|
def cleanup_task(): |
|
"""Background task for cleaning up inactive players.""" |
|
while True: |
|
try: |
|
self.game_facade.cleanup_inactive_players() |
|
threading.Event().wait(30) |
|
except Exception as e: |
|
print(f"Error in cleanup task: {e}") |
|
threading.Event().wait(5) |
|
|
|
def auto_refresh_task(): |
|
"""Background task for auto-refreshing game state.""" |
|
while True: |
|
try: |
|
|
|
|
|
threading.Event().wait(2) |
|
except Exception as e: |
|
print(f"Error in auto-refresh task: {e}") |
|
threading.Event().wait(5) |
|
|
|
|
|
self._cleanup_task = threading.Thread(target=cleanup_task, daemon=True) |
|
self._cleanup_task.start() |
|
|
|
|
|
self._auto_refresh_task = threading.Thread(target=auto_refresh_task, daemon=True) |
|
self._auto_refresh_task.start() |
|
|
|
def run(self, share: bool = False, server_port: int = 7860): |
|
"""Run the MMORPG application.""" |
|
print("🎮 Starting MMORPG Application...") |
|
print("🏗️ Initializing game engine...") |
|
|
|
if not self.game_engine.start(): |
|
print("❌ Failed to start game engine") |
|
return |
|
|
|
print("🎨 Creating user interface...") |
|
|
|
|
|
interface = self.create_gradio_interface() |
|
|
|
print("🔧 Starting background tasks...") |
|
|
|
|
|
self.start_background_tasks() |
|
|
|
print("🚀 Launching server...") |
|
print(f"🌐 Server will be available at: http://localhost:{server_port}") |
|
|
|
if share: |
|
print("🔗 Public URL will be generated...") |
|
|
|
|
|
interface.launch( |
|
share=True, |
|
debug=True, |
|
|
|
mcp_server=True, |
|
show_error=True, |
|
quiet=False |
|
) |
|
|
|
def main(): |
|
"""Main entry point for the application.""" |
|
|
|
app = MMORPGApplication() |
|
|
|
|
|
app.run(share=True, server_port=7869) |
|
|
|
if __name__ == "__main__": |
|
main() |