Chris4K's picture
Upload 195 files
4c75d73 verified
"""
Plugin management service implementation.
"""
import os
import importlib.util
import importlib
import sys
from typing import Dict, List, Optional, Any
from ..interfaces.service_interfaces import IPluginService
from ..interfaces.plugin_interfaces import (
IPlugin, PluginMetadata, PluginError,
PluginInitializationError, PluginDependencyError
)
class PluginService(IPluginService):
"""Service for managing the plugin system."""
def __init__(self, plugins_directory: str = "plugins"):
self.plugins_directory = plugins_directory
self.loaded_plugins: Dict[str, IPlugin] = {}
self.plugin_modules: Dict[str, Any] = {}
self.game_context: Dict[str, Any] = {}
def set_game_context(self, context: Dict[str, Any]):
"""Set the game context for plugins."""
self.game_context = context
def load_plugins(self) -> int:
"""Load all plugins from plugins directory."""
if not os.path.exists(self.plugins_directory):
os.makedirs(self.plugins_directory)
print(f"[PluginService] Created plugins directory: {self.plugins_directory}")
return 0
loaded_count = 0
plugin_files = [f for f in os.listdir(self.plugins_directory)
if f.endswith('.py') and not f.startswith('__')]
for plugin_file in plugin_files:
plugin_path = os.path.join(self.plugins_directory, plugin_file)
if self.load_plugin(plugin_path):
loaded_count += 1
print(f"[PluginService] Loaded {loaded_count}/{len(plugin_files)} plugins")
return loaded_count
def load_plugin(self, plugin_path: str) -> bool:
"""Load a specific plugin."""
try:
if not os.path.exists(plugin_path):
print(f"[PluginService] Plugin file not found: {plugin_path}")
return False
# Extract plugin name from file path
plugin_name = os.path.splitext(os.path.basename(plugin_path))[0]
# Load module
spec = importlib.util.spec_from_file_location(plugin_name, plugin_path)
if not spec or not spec.loader:
print(f"[PluginService] Failed to create spec for {plugin_path}")
return False
module = importlib.util.module_from_spec(spec)
# Execute module
spec.loader.exec_module(module)
# Find plugin class (should implement IPlugin)
plugin_class = None
plugin_candidates = []
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, IPlugin) and
attr != IPlugin):
# Skip interface classes (they usually start with 'I' and are imported)
if not (attr_name.startswith('I') and hasattr(attr, '__abstractmethods__') and attr.__abstractmethods__):
plugin_candidates.append((attr_name, attr))
# Prefer classes defined in this module (not imported interfaces)
for attr_name, attr in plugin_candidates:
if attr.__module__ == module.__name__:
plugin_class = attr
break
# If no module-local class found, take the first candidate
if not plugin_class and plugin_candidates:
plugin_class = plugin_candidates[0][1]
if not plugin_class:
print(f"[PluginService] No IPlugin implementation found in {plugin_path}")
return False
# Instantiate plugin
plugin_instance = plugin_class()
# Check metadata
metadata = plugin_instance.metadata
if not isinstance(metadata, PluginMetadata):
print(f"[PluginService] Invalid metadata in plugin {plugin_name}")
return False
# Check dependencies
if not self._check_dependencies(metadata.dependencies):
print(f"[PluginService] Dependencies not met for plugin {metadata.id}")
return False
# Initialize plugin
if not plugin_instance.initialize(self.game_context):
print(f"[PluginService] Failed to initialize plugin {metadata.id}")
return False
# Store plugin
self.loaded_plugins[metadata.id] = plugin_instance
self.plugin_modules[metadata.id] = module
print(f"[PluginService] Successfully loaded plugin: {metadata.name} v{metadata.version}")
return True
except Exception as e:
print(f"[PluginService] Error loading plugin {plugin_path}: {e}")
return False
def unload_plugin(self, plugin_id: str) -> bool:
"""Unload a plugin."""
try:
if plugin_id not in self.loaded_plugins:
return False
plugin = self.loaded_plugins[plugin_id]
# Shutdown plugin
plugin.shutdown()
# Remove from loaded plugins
del self.loaded_plugins[plugin_id]
# Remove module from sys.modules if present
if plugin_id in self.plugin_modules:
module = self.plugin_modules[plugin_id]
module_name = getattr(module, '__name__', None)
if module_name and module_name in sys.modules:
del sys.modules[module_name]
del self.plugin_modules[plugin_id]
print(f"[PluginService] Unloaded plugin: {plugin_id}")
return True
except Exception as e:
print(f"[PluginService] Error unloading plugin {plugin_id}: {e}")
return False
def get_loaded_plugins(self) -> List[str]:
"""Get list of loaded plugin IDs."""
return list(self.loaded_plugins.keys())
def reload_plugin(self, plugin_id: str) -> bool:
"""Reload a plugin (hot-reload)."""
try:
if plugin_id not in self.loaded_plugins:
return False
# Get plugin file path
plugin_files = [f for f in os.listdir(self.plugins_directory)
if f.endswith('.py')]
plugin_file = None
for f in plugin_files:
plugin_path = os.path.join(self.plugins_directory, f)
# Try to match by loading and checking metadata
# This is a simplified approach
if plugin_id in f.lower():
plugin_file = plugin_path
break
if not plugin_file:
print(f"[PluginService] Could not find file for plugin {plugin_id}")
return False
# Unload current plugin
self.unload_plugin(plugin_id)
# Load updated plugin
return self.load_plugin(plugin_file)
except Exception as e:
print(f"[PluginService] Error reloading plugin {plugin_id}: {e}")
return False
def get_plugin(self, plugin_id: str) -> Optional[IPlugin]:
"""Get a loaded plugin instance."""
return self.loaded_plugins.get(plugin_id)
def get_plugins_by_type(self, plugin_type: str) -> List[IPlugin]:
"""Get all plugins of a specific type."""
result = []
for plugin in self.loaded_plugins.values():
if plugin.metadata.plugin_type.value == plugin_type:
result.append(plugin)
return result
def call_plugin_event(self, event_name: str, *args, **kwargs):
"""Call an event on all loaded plugins."""
for plugin in self.loaded_plugins.values():
try:
if hasattr(plugin, event_name):
method = getattr(plugin, event_name)
if callable(method):
method(*args, **kwargs)
except Exception as e:
print(f"[PluginService] Error calling {event_name} on plugin {plugin.metadata.id}: {e}")
def get_plugin_status(self, plugin_id: str) -> Optional[Dict[str, Any]]:
"""Get status of a specific plugin."""
plugin = self.loaded_plugins.get(plugin_id)
if not plugin:
return None
try:
status = plugin.get_status()
status['metadata'] = {
'id': plugin.metadata.id,
'name': plugin.metadata.name,
'version': plugin.metadata.version,
'type': plugin.metadata.plugin_type.value,
'author': plugin.metadata.author
}
return status
except Exception as e:
return {'error': str(e)}
def get_all_plugin_status(self) -> Dict[str, Dict[str, Any]]:
"""Get status of all loaded plugins."""
result = {}
for plugin_id in self.loaded_plugins:
result[plugin_id] = self.get_plugin_status(plugin_id)
return result
def _check_dependencies(self, dependencies: List[str]) -> bool:
"""Check if plugin dependencies are satisfied."""
# For now, just check if dependency plugins are loaded
for dep in dependencies:
if dep not in self.loaded_plugins:
return False
return True
def create_sample_plugin(self) -> bool:
"""Create a sample plugin file for demonstration."""
try:
sample_plugin_content = '''"""
Sample plugin demonstrating the plugin system.
"""
from src.interfaces.plugin_interfaces import IPlugin, PluginMetadata, PluginType
from typing import Dict, Any
class SamplePlugin(IPlugin):
"""Sample plugin implementation."""
@property
def metadata(self) -> PluginMetadata:
return PluginMetadata(
id="sample_plugin",
name="Sample Plugin",
version="1.0.0",
description="A sample plugin demonstrating the plugin system",
author="MMORPG System",
plugin_type=PluginType.EVENT,
dependencies=[],
config={}
)
def initialize(self, context: Dict[str, Any]) -> bool:
"""Initialize the plugin."""
self.context = context
print("[SamplePlugin] Plugin initialized!")
return True
def shutdown(self) -> bool:
"""Shutdown the plugin."""
print("[SamplePlugin] Plugin shutdown!")
return True
def get_status(self) -> Dict[str, Any]:
"""Get plugin status."""
return {
"status": "active",
"message": "Sample plugin is running normally"
}
def on_player_join(self, player_id: str) -> None:
"""Called when a player joins."""
print(f"[SamplePlugin] Player {player_id} joined!")
def on_player_leave(self, player_id: str) -> None:
"""Called when a player leaves."""
print(f"[SamplePlugin] Player {player_id} left!")
def on_chat_message(self, sender_id: str, message: str, message_type: str) -> None:
"""Called when a chat message is sent."""
if "hello plugin" in message.lower():
print(f"[SamplePlugin] Detected greeting from {sender_id}")
'''
sample_path = os.path.join(self.plugins_directory, "sample_plugin.py")
with open(sample_path, 'w', encoding='utf-8') as f:
f.write(sample_plugin_content)
print(f"[PluginService] Created sample plugin at {sample_path}")
return True
except Exception as e:
print(f"[PluginService] Error creating sample plugin: {e}")
return False