|
"""
|
|
Plugin management service implementation.
|
|
"""
|
|
|
|
import os
|
|
import importlib.util
|
|
import importlib
|
|
import sys
|
|
from typing import Dict, List, Optional, Any
|
|
from ..interfaces.service_interfaces import IPluginService
|
|
from ..interfaces.plugin_interfaces import (
|
|
IPlugin, PluginMetadata, PluginError,
|
|
PluginInitializationError, PluginDependencyError
|
|
)
|
|
|
|
|
|
class PluginService(IPluginService):
|
|
"""Service for managing the plugin system."""
|
|
|
|
def __init__(self, plugins_directory: str = "plugins"):
|
|
self.plugins_directory = plugins_directory
|
|
self.loaded_plugins: Dict[str, IPlugin] = {}
|
|
self.plugin_modules: Dict[str, Any] = {}
|
|
self.game_context: Dict[str, Any] = {}
|
|
|
|
def set_game_context(self, context: Dict[str, Any]):
|
|
"""Set the game context for plugins."""
|
|
self.game_context = context
|
|
|
|
def load_plugins(self) -> int:
|
|
"""Load all plugins from plugins directory."""
|
|
if not os.path.exists(self.plugins_directory):
|
|
os.makedirs(self.plugins_directory)
|
|
print(f"[PluginService] Created plugins directory: {self.plugins_directory}")
|
|
return 0
|
|
|
|
loaded_count = 0
|
|
plugin_files = [f for f in os.listdir(self.plugins_directory)
|
|
if f.endswith('.py') and not f.startswith('__')]
|
|
|
|
for plugin_file in plugin_files:
|
|
plugin_path = os.path.join(self.plugins_directory, plugin_file)
|
|
if self.load_plugin(plugin_path):
|
|
loaded_count += 1
|
|
|
|
print(f"[PluginService] Loaded {loaded_count}/{len(plugin_files)} plugins")
|
|
return loaded_count
|
|
|
|
def load_plugin(self, plugin_path: str) -> bool:
|
|
"""Load a specific plugin."""
|
|
try:
|
|
if not os.path.exists(plugin_path):
|
|
print(f"[PluginService] Plugin file not found: {plugin_path}")
|
|
return False
|
|
|
|
|
|
plugin_name = os.path.splitext(os.path.basename(plugin_path))[0]
|
|
|
|
|
|
spec = importlib.util.spec_from_file_location(plugin_name, plugin_path)
|
|
if not spec or not spec.loader:
|
|
print(f"[PluginService] Failed to create spec for {plugin_path}")
|
|
return False
|
|
|
|
module = importlib.util.module_from_spec(spec)
|
|
|
|
|
|
spec.loader.exec_module(module)
|
|
|
|
plugin_class = None
|
|
plugin_candidates = []
|
|
|
|
for attr_name in dir(module):
|
|
attr = getattr(module, attr_name)
|
|
if (isinstance(attr, type) and
|
|
issubclass(attr, IPlugin) and
|
|
attr != IPlugin):
|
|
|
|
if not (attr_name.startswith('I') and hasattr(attr, '__abstractmethods__') and attr.__abstractmethods__):
|
|
plugin_candidates.append((attr_name, attr))
|
|
|
|
|
|
for attr_name, attr in plugin_candidates:
|
|
if attr.__module__ == module.__name__:
|
|
plugin_class = attr
|
|
break
|
|
|
|
|
|
if not plugin_class and plugin_candidates:
|
|
plugin_class = plugin_candidates[0][1]
|
|
|
|
if not plugin_class:
|
|
print(f"[PluginService] No IPlugin implementation found in {plugin_path}")
|
|
return False
|
|
|
|
|
|
plugin_instance = plugin_class()
|
|
|
|
|
|
metadata = plugin_instance.metadata
|
|
if not isinstance(metadata, PluginMetadata):
|
|
print(f"[PluginService] Invalid metadata in plugin {plugin_name}")
|
|
return False
|
|
|
|
|
|
if not self._check_dependencies(metadata.dependencies):
|
|
print(f"[PluginService] Dependencies not met for plugin {metadata.id}")
|
|
return False
|
|
|
|
|
|
if not plugin_instance.initialize(self.game_context):
|
|
print(f"[PluginService] Failed to initialize plugin {metadata.id}")
|
|
return False
|
|
|
|
|
|
self.loaded_plugins[metadata.id] = plugin_instance
|
|
self.plugin_modules[metadata.id] = module
|
|
|
|
print(f"[PluginService] Successfully loaded plugin: {metadata.name} v{metadata.version}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"[PluginService] Error loading plugin {plugin_path}: {e}")
|
|
return False
|
|
|
|
def unload_plugin(self, plugin_id: str) -> bool:
|
|
"""Unload a plugin."""
|
|
try:
|
|
if plugin_id not in self.loaded_plugins:
|
|
return False
|
|
|
|
plugin = self.loaded_plugins[plugin_id]
|
|
|
|
|
|
plugin.shutdown()
|
|
|
|
|
|
del self.loaded_plugins[plugin_id]
|
|
|
|
|
|
if plugin_id in self.plugin_modules:
|
|
module = self.plugin_modules[plugin_id]
|
|
module_name = getattr(module, '__name__', None)
|
|
if module_name and module_name in sys.modules:
|
|
del sys.modules[module_name]
|
|
del self.plugin_modules[plugin_id]
|
|
|
|
print(f"[PluginService] Unloaded plugin: {plugin_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"[PluginService] Error unloading plugin {plugin_id}: {e}")
|
|
return False
|
|
|
|
def get_loaded_plugins(self) -> List[str]:
|
|
"""Get list of loaded plugin IDs."""
|
|
return list(self.loaded_plugins.keys())
|
|
|
|
def reload_plugin(self, plugin_id: str) -> bool:
|
|
"""Reload a plugin (hot-reload)."""
|
|
try:
|
|
if plugin_id not in self.loaded_plugins:
|
|
return False
|
|
|
|
|
|
plugin_files = [f for f in os.listdir(self.plugins_directory)
|
|
if f.endswith('.py')]
|
|
|
|
plugin_file = None
|
|
for f in plugin_files:
|
|
plugin_path = os.path.join(self.plugins_directory, f)
|
|
|
|
|
|
if plugin_id in f.lower():
|
|
plugin_file = plugin_path
|
|
break
|
|
|
|
if not plugin_file:
|
|
print(f"[PluginService] Could not find file for plugin {plugin_id}")
|
|
return False
|
|
|
|
|
|
self.unload_plugin(plugin_id)
|
|
|
|
|
|
return self.load_plugin(plugin_file)
|
|
|
|
except Exception as e:
|
|
print(f"[PluginService] Error reloading plugin {plugin_id}: {e}")
|
|
return False
|
|
|
|
def get_plugin(self, plugin_id: str) -> Optional[IPlugin]:
|
|
"""Get a loaded plugin instance."""
|
|
return self.loaded_plugins.get(plugin_id)
|
|
|
|
def get_plugins_by_type(self, plugin_type: str) -> List[IPlugin]:
|
|
"""Get all plugins of a specific type."""
|
|
result = []
|
|
for plugin in self.loaded_plugins.values():
|
|
if plugin.metadata.plugin_type.value == plugin_type:
|
|
result.append(plugin)
|
|
return result
|
|
|
|
def call_plugin_event(self, event_name: str, *args, **kwargs):
|
|
"""Call an event on all loaded plugins."""
|
|
for plugin in self.loaded_plugins.values():
|
|
try:
|
|
if hasattr(plugin, event_name):
|
|
method = getattr(plugin, event_name)
|
|
if callable(method):
|
|
method(*args, **kwargs)
|
|
except Exception as e:
|
|
print(f"[PluginService] Error calling {event_name} on plugin {plugin.metadata.id}: {e}")
|
|
|
|
def get_plugin_status(self, plugin_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get status of a specific plugin."""
|
|
plugin = self.loaded_plugins.get(plugin_id)
|
|
if not plugin:
|
|
return None
|
|
|
|
try:
|
|
status = plugin.get_status()
|
|
status['metadata'] = {
|
|
'id': plugin.metadata.id,
|
|
'name': plugin.metadata.name,
|
|
'version': plugin.metadata.version,
|
|
'type': plugin.metadata.plugin_type.value,
|
|
'author': plugin.metadata.author
|
|
}
|
|
return status
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
|
|
def get_all_plugin_status(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Get status of all loaded plugins."""
|
|
result = {}
|
|
for plugin_id in self.loaded_plugins:
|
|
result[plugin_id] = self.get_plugin_status(plugin_id)
|
|
return result
|
|
|
|
def _check_dependencies(self, dependencies: List[str]) -> bool:
|
|
"""Check if plugin dependencies are satisfied."""
|
|
|
|
for dep in dependencies:
|
|
if dep not in self.loaded_plugins:
|
|
return False
|
|
return True
|
|
|
|
def create_sample_plugin(self) -> bool:
|
|
"""Create a sample plugin file for demonstration."""
|
|
try:
|
|
sample_plugin_content = '''"""
|
|
Sample plugin demonstrating the plugin system.
|
|
"""
|
|
|
|
from src.interfaces.plugin_interfaces import IPlugin, PluginMetadata, PluginType
|
|
from typing import Dict, Any
|
|
|
|
|
|
class SamplePlugin(IPlugin):
|
|
"""Sample plugin implementation."""
|
|
|
|
@property
|
|
def metadata(self) -> PluginMetadata:
|
|
return PluginMetadata(
|
|
id="sample_plugin",
|
|
name="Sample Plugin",
|
|
version="1.0.0",
|
|
description="A sample plugin demonstrating the plugin system",
|
|
author="MMORPG System",
|
|
plugin_type=PluginType.EVENT,
|
|
dependencies=[],
|
|
config={}
|
|
)
|
|
|
|
def initialize(self, context: Dict[str, Any]) -> bool:
|
|
"""Initialize the plugin."""
|
|
self.context = context
|
|
print("[SamplePlugin] Plugin initialized!")
|
|
return True
|
|
|
|
def shutdown(self) -> bool:
|
|
"""Shutdown the plugin."""
|
|
print("[SamplePlugin] Plugin shutdown!")
|
|
return True
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get plugin status."""
|
|
return {
|
|
"status": "active",
|
|
"message": "Sample plugin is running normally"
|
|
}
|
|
|
|
def on_player_join(self, player_id: str) -> None:
|
|
"""Called when a player joins."""
|
|
print(f"[SamplePlugin] Player {player_id} joined!")
|
|
|
|
def on_player_leave(self, player_id: str) -> None:
|
|
"""Called when a player leaves."""
|
|
print(f"[SamplePlugin] Player {player_id} left!")
|
|
|
|
def on_chat_message(self, sender_id: str, message: str, message_type: str) -> None:
|
|
"""Called when a chat message is sent."""
|
|
if "hello plugin" in message.lower():
|
|
print(f"[SamplePlugin] Detected greeting from {sender_id}")
|
|
'''
|
|
|
|
sample_path = os.path.join(self.plugins_directory, "sample_plugin.py")
|
|
with open(sample_path, 'w', encoding='utf-8') as f:
|
|
f.write(sample_plugin_content)
|
|
|
|
print(f"[PluginService] Created sample plugin at {sample_path}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"[PluginService] Error creating sample plugin: {e}")
|
|
return False
|
|
|