MMORPG_AI_NPC_MCP_CLIENT_SERVER / plugins /enhanced_chat_plugin.py.backup
Chris4K's picture
Upload 195 files
4c75d73 verified
import asyncio
from src.interfaces.plugin_interfaces import IChatPlugin, PluginMetadata, PluginType
from typing import Dict, List, Any
class EnhancedChatPlugin(IChatPlugin):
def __init__(self): self._metadata = PluginMetadata(
id="enhanced_chat",
name="Enhanced Chat System",
version="1.2.0",
author="MMORPG Dev Team",
description="Adds emotes, chat channels, and advanced chat commands",
plugin_type=PluginType.SERVICE,
dependencies=[],
config={
"enable_emotes": True,
"enable_channels": True,
"max_message_length": 500,
"chat_cooldown": 1
}
)
self._enabled = False
self.commands = {
'/tell': self.tell_command,
'/whisper': self.whisper_command,
'/shout': self.shout_command,
'/emote': self.emote_command,
'/me': self.emote_command, # Alias for /emote
'/who': self.who_command,
'/time': self.time_command,
'/help': self.help_command,
'/commands': self.help_command, # Alias for /help
'/clear': self.clear_command,
'/ignore': self.ignore_command,
'/unignore': self.unignore_command,
'/history': self.history_command,
'/channels': self.channels_command,
'/join': self.join_channel_command,
'/leave': self.leave_channel_command,
'/channel': self.channel_command,
'/c': self.channel_command, # Alias for /channel
'/mail': self.mail_command,
'/read': self.read_mail_command,
'/mailbox': self.mailbox_command,
'/marketplace': self.marketplace_command,
'/trade': self.trade_command,
'/auction': self.auction_command,
}
# Chat history storage
self.chat_history = []
self.max_history = 100
# Player ignore lists
self.ignore_lists = {}
# Custom channels
self.channels = {
'global': {'description': 'Global chat channel', 'members': set()},
'trade': {'description': 'Trading channel', 'members': set()},
'help': {'description': 'Help and questions channel', 'members': set()}
}
# Player channel memberships
self.player_channels = {}
@property
def metadata(self) -> PluginMetadata:
return self._metadata
def initialize(self, context: Dict[str, Any]) -> bool:
"""Initialize the enhanced chat plugin."""
try:
self._config = self._metadata.config
self._enabled = True
print(f"💬 Enhanced Chat Plugin initialized")
return True
except Exception as e:
print(f"💬 Failed to initialize Enhanced Chat Plugin: {e}")
return False
def shutdown(self) -> bool:
"""Shutdown the plugin and cleanup resources."""
self._enabled = False
print("💬 Enhanced Chat Plugin shut down")
return True
def get_status(self) -> Dict[str, Any]:
"""Get current plugin status."""
return {
"status": "active" if self._enabled else "inactive",
"message": "Enhanced chat system with marketplace commands"
}
def process_message(self, player_id: str, message: str, channel: str = "global") -> Dict[str, Any]:
"""Process and enhance chat messages."""
if message.startswith('/'):
return self.process_command(player_id, message)
else:
return self.broadcast_message(player_id, message)
def get_available_commands(self) -> List[Dict[str, str]]:
"""Get list of available chat commands."""
return [
{"command": "/marketplace", "description": "View marketplace listings"},
{"command": "/trade create <item> <price>", "description": "Create trade listing"},
{"command": "/trade accept <id>", "description": "Accept trade offer"},
{"command": "/auction create <item> <start_price>", "description": "Create auction"},
{"command": "/tell <player> <message>", "description": "Send private message"},
{"command": "/who", "description": "List online players"},
{"command": "/help", "description": "Show available commands"}
]
def get_chat_channels(self) -> Dict[str, Dict[str, str]]:
"""Get available chat channels."""
return {
"global": {"name": "Global", "description": "Main chat channel"},
"trade": {"name": "Trade", "description": "Trading discussions"},
"help": {"name": "Help", "description": "Help and questions"}
}
async def on_player_message(self, player_id, message):
"""Handle player chat messages"""
if message.startswith('/'):
await self.process_command(player_id, message)
else:
await self.broadcast_message(player_id, message)
async def process_command(self, player_id, message):
"""Process chat commands"""
parts = message.split(' ', 1)
command = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
if command in self.commands:
try:
await self.commands[command](player_id, args)
except Exception as e:
await self.send_to_player(player_id, f"Error executing command: {str(e)}")
else:
await self.send_to_player(player_id, f"Unknown command: {command}. Type /help for available commands.")
async def broadcast_message(self, player_id, message):
"""Broadcast a message to all players"""
player_name = await self.get_player_name(player_id)
formatted_message = f"{player_name}: {message}"
# Add to history
self.add_to_history(formatted_message)
# Send to all connected players
for pid in self.game_manager.get_connected_players():
if pid not in self.ignore_lists.get(player_id, set()):
await self.send_to_player(pid, formatted_message)
def add_to_history(self, message):
"""Add message to chat history"""
self.chat_history.append({
'timestamp': asyncio.get_event_loop().time(),
'message': message
})
# Keep only the last max_history messages
if len(self.chat_history) > self.max_history:
self.chat_history.pop(0)
async def tell_command(self, player_id, args):
"""Send a private message to another player"""
if not args:
await self.send_to_player(player_id, "Usage: /tell <player> <message>")
return
parts = args.split(' ', 1)
if len(parts) < 2:
await self.send_to_player(player_id, "Usage: /tell <player> <message>")
return
target_name, message = parts
target_id = await self.find_player_by_name(target_name)
if not target_id:
await self.send_to_player(player_id, f"Player '{target_name}' not found.")
return
if target_id in self.ignore_lists.get(player_id, set()):
await self.send_to_player(player_id, f"You are ignoring {target_name}.")
return
sender_name = await self.get_player_name(player_id)
await self.send_to_player(target_id, f"[TELL from {sender_name}]: {message}")
await self.send_to_player(player_id, f"[TELL to {target_name}]: {message}")
async def whisper_command(self, player_id, args):
"""Alias for tell command"""
await self.tell_command(player_id, args)
async def shout_command(self, player_id, args):
"""Send a message to all players in a wider area"""
if not args:
await self.send_to_player(player_id, "Usage: /shout <message>")
return
player_name = await self.get_player_name(player_id)
formatted_message = f"{player_name} shouts: {args}"
# Add to history
self.add_to_history(formatted_message)
# Send to all players (could be modified to only nearby players)
for pid in self.game_manager.get_connected_players():
if pid not in self.ignore_lists.get(player_id, set()):
await self.send_to_player(pid, formatted_message)
async def emote_command(self, player_id, args):
"""Send an emote message"""
if not args:
await self.send_to_player(player_id, "Usage: /emote <action> or /me <action>")
return
player_name = await self.get_player_name(player_id)
formatted_message = f"* {player_name} {args}"
# Add to history
self.add_to_history(formatted_message)
# Send to all players
for pid in self.game_manager.get_connected_players():
if pid not in self.ignore_lists.get(player_id, set()):
await self.send_to_player(pid, formatted_message)
async def who_command(self, player_id, args):
"""Show list of online players"""
connected_players = self.game_manager.get_connected_players()
player_names = []
for pid in connected_players:
name = await self.get_player_name(pid)
player_names.append(name)
if player_names:
message = f"Online players ({len(player_names)}): {', '.join(player_names)}"
else:
message = "No players currently online."
await self.send_to_player(player_id, message)
async def time_command(self, player_id, args):
"""Show current game time"""
import datetime
current_time = datetime.datetime.now().strftime("%H:%M:%S")
await self.send_to_player(player_id, f"Current time: {current_time}")
async def help_command(self, player_id, args):
"""Show available commands"""
commands = [
"/tell <player> <msg> - Send private message",
"/whisper <player> <msg> - Same as /tell",
"/shout <message> - Shout to all players",
"/emote <action> - Perform an action",
"/me <action> - Same as /emote",
"/who - Show online players",
"/time - Show current time",
"/help - Show this help",
"/clear - Clear your chat",
"/ignore <player> - Ignore a player",
"/unignore <player> - Stop ignoring a player",
"/history - Show recent chat history",
"/channels - Show available channels",
"/join <channel> - Join a channel",
"/leave <channel> - Leave a channel",
"/channel <channel> <msg> - Send message to channel",
"/c <channel> <msg> - Same as /channel",
"/mail <player> <msg> - Send mail to player",
"/read - Read your mail",
"/mailbox - Check mailbox status",
"/marketplace - View marketplace listings",
"/trade create <item> <price> - Create trade offer",
"/trade accept <id> - Accept trade offer",
"/auction create <item> <start_price> - Create auction"
]
await self.send_to_player(player_id, "Available commands:")
for cmd in commands:
await self.send_to_player(player_id, f" {cmd}")
async def clear_command(self, player_id, args):
"""Clear player's chat"""
# This would typically clear the client-side chat
await self.send_to_player(player_id, "\n" * 20)
await self.send_to_player(player_id, "Chat cleared.")
async def ignore_command(self, player_id, args):
"""Add a player to ignore list"""
if not args:
await self.send_to_player(player_id, "Usage: /ignore <player>")
return
target_id = await self.find_player_by_name(args.strip())
if not target_id:
await self.send_to_player(player_id, f"Player '{args}' not found.")
return
if target_id == player_id:
await self.send_to_player(player_id, "You cannot ignore yourself.")
return
if player_id not in self.ignore_lists:
self.ignore_lists[player_id] = set()
self.ignore_lists[player_id].add(target_id)
await self.send_to_player(player_id, f"You are now ignoring {args}.")
async def unignore_command(self, player_id, args):
"""Remove a player from ignore list"""
if not args:
await self.send_to_player(player_id, "Usage: /unignore <player>")
return
target_id = await self.find_player_by_name(args.strip())
if not target_id:
await self.send_to_player(player_id, f"Player '{args}' not found.")
return
if player_id in self.ignore_lists and target_id in self.ignore_lists[player_id]:
self.ignore_lists[player_id].remove(target_id)
await self.send_to_player(player_id, f"You are no longer ignoring {args}.")
else:
await self.send_to_player(player_id, f"You are not ignoring {args}.")
async def history_command(self, player_id, args):
"""Show recent chat history"""
if not self.chat_history:
await self.send_to_player(player_id, "No chat history available.")
return
await self.send_to_player(player_id, "Recent chat history:")
for entry in self.chat_history[-10:]: # Show last 10 messages
await self.send_to_player(player_id, entry['message'])
async def channels_command(self, player_id, args):
"""Show available channels"""
await self.send_to_player(player_id, "Available channels:")
for channel_name, channel_info in self.channels.items():
member_count = len(channel_info['members'])
await self.send_to_player(player_id, f" {channel_name}: {channel_info['description']} ({member_count} members)")
async def join_channel_command(self, player_id, args):
"""Join a chat channel"""
if not args:
await self.send_to_player(player_id, "Usage: /join <channel>")
return
channel_name = args.strip().lower()
if channel_name not in self.channels:
await self.send_to_player(player_id, f"Channel '{channel_name}' does not exist.")
return
if player_id not in self.player_channels:
self.player_channels[player_id] = set()
self.player_channels[player_id].add(channel_name)
self.channels[channel_name]['members'].add(player_id)
player_name = await self.get_player_name(player_id)
await self.send_to_player(player_id, f"You have joined the '{channel_name}' channel.")
# Notify other channel members
for member_id in self.channels[channel_name]['members']:
if member_id != player_id:
await self.send_to_player(member_id, f"{player_name} has joined the '{channel_name}' channel.")
async def leave_channel_command(self, player_id, args):
"""Leave a chat channel"""
if not args:
await self.send_to_player(player_id, "Usage: /leave <channel>")
return
channel_name = args.strip().lower()
if channel_name not in self.channels:
await self.send_to_player(player_id, f"Channel '{channel_name}' does not exist.")
return
if player_id not in self.player_channels or channel_name not in self.player_channels[player_id]:
await self.send_to_player(player_id, f"You are not in the '{channel_name}' channel.")
return
self.player_channels[player_id].remove(channel_name)
self.channels[channel_name]['members'].discard(player_id)
player_name = await self.get_player_name(player_id)
await self.send_to_player(player_id, f"You have left the '{channel_name}' channel.")
# Notify other channel members
for member_id in self.channels[channel_name]['members']:
await self.send_to_player(member_id, f"{player_name} has left the '{channel_name}' channel.")
async def channel_command(self, player_id, args):
"""Send a message to a specific channel"""
if not args:
await self.send_to_player(player_id, "Usage: /channel <channel> <message>")
return
parts = args.split(' ', 1)
if len(parts) < 2:
await self.send_to_player(player_id, "Usage: /channel <channel> <message>")
return
channel_name, message = parts
channel_name = channel_name.lower()
if channel_name not in self.channels:
await self.send_to_player(player_id, f"Channel '{channel_name}' does not exist.")
return
if player_id not in self.player_channels or channel_name not in self.player_channels[player_id]:
await self.send_to_player(player_id, f"You are not in the '{channel_name}' channel. Use /join {channel_name} first.")
return
player_name = await self.get_player_name(player_id)
formatted_message = f"[{channel_name.upper()}] {player_name}: {message}"
# Send to all channel members
for member_id in self.channels[channel_name]['members']:
if member_id not in self.ignore_lists.get(player_id, set()):
await self.send_to_player(member_id, formatted_message)
async def mail_command(self, player_id, args):
"""Send mail to a player using the read2burn system"""
if not args:
await self.send_to_player(player_id, "Usage: /mail <player> <message>")
return
parts = args.split(' ', 1)
if len(parts) < 2:
await self.send_to_player(player_id, "Usage: /mail <player> <message>")
return
target_name, message = parts
target_id = await self.find_player_by_name(target_name)
if not target_id:
await self.send_to_player(player_id, f"Player '{target_name}' not found.")
return
# Use the read2burn addon to send mail
try:
read2burn_addon = self.game_manager.get_addon('read2burn')
if read2burn_addon:
sender_name = await self.get_player_name(player_id)
await read2burn_addon.send_message(target_id, f"Mail from {sender_name}: {message}")
await self.send_to_player(player_id, f"Mail sent to {target_name}.")
else:
await self.send_to_player(player_id, "Mail system is not available.")
except Exception as e:
await self.send_to_player(player_id, f"Failed to send mail: {str(e)}")
async def read_mail_command(self, player_id, args):
"""Read mail using the read2burn system"""
try:
read2burn_addon = self.game_manager.get_addon('read2burn')
if read2burn_addon:
messages = await read2burn_addon.get_messages(player_id)
if messages:
await self.send_to_player(player_id, f"You have {len(messages)} unread message(s):")
for i, msg in enumerate(messages):
await self.send_to_player(player_id, f"{i+1}. {msg}")
# Messages are automatically deleted after reading (read2burn)
await read2burn_addon.clear_messages(player_id)
else:
await self.send_to_player(player_id, "You have no unread mail.")
else:
await self.send_to_player(player_id, "Mail system is not available.")
except Exception as e:
await self.send_to_player(player_id, f"Failed to read mail: {str(e)}")
async def mailbox_command(self, player_id, args):
"""Check mailbox status"""
try:
read2burn_addon = self.game_manager.get_addon('read2burn')
if read2burn_addon:
message_count = await read2burn_addon.get_message_count(player_id)
await self.send_to_player(player_id, f"You have {message_count} unread message(s) in your mailbox.")
else:
await self.send_to_player(player_id, "Mail system is not available.")
except Exception as e:
await self.send_to_player(player_id, f"Failed to check mailbox: {str(e)}")
async def marketplace_command(self, player_id, args):
"""Show marketplace listings"""
# Demo marketplace listings - in a real implementation, this would query the trading system
demo_listings = [
"1. Iron Sword - 100 gold (seller: Alice)",
"2. Magic Potion - 50 gold (seller: Bob)",
"3. Dragon Scale - 500 gold (seller: Charlie)",
"4. Enchanted Ring - 250 gold (seller: Diana)"
]
await self.send_to_player(player_id, "=== MARKETPLACE ===")
await self.send_to_player(player_id, "Current listings:")
for listing in demo_listings:
await self.send_to_player(player_id, f" {listing}")
await self.send_to_player(player_id, "Use '/trade create <item> <price>' to list an item")
await self.send_to_player(player_id, "Use '/trade accept <id>' to purchase an item")
async def trade_command(self, player_id, args):
"""Handle trading commands"""
if not args:
await self.send_to_player(player_id, "Usage: /trade <create|accept> [arguments]")
return
parts = args.split(' ', 1)
action = parts[0].lower()
if action == 'create':
if len(parts) < 2:
await self.send_to_player(player_id, "Usage: /trade create <item> <price>")
return
# Parse item and price
create_args = parts[1].split(' ')
if len(create_args) < 2:
await self.send_to_player(player_id, "Usage: /trade create <item> <price>")
return
price = create_args[-1]
item = ' '.join(create_args[:-1])
await self.send_to_player(player_id, f"Created trade listing: {item} for {price} gold")
# In a real implementation, this would interact with the trading system plugin
elif action == 'accept':
if len(parts) < 2:
await self.send_to_player(player_id, "Usage: /trade accept <listing_id>")
return
listing_id = parts[1].strip()
await self.send_to_player(player_id, f"Attempting to purchase listing {listing_id}...")
# In a real implementation, this would process the trade
else:
await self.send_to_player(player_id, "Unknown trade action. Use 'create' or 'accept'.")
async def auction_command(self, player_id, args):
"""Handle auction commands"""
if not args:
await self.send_to_player(player_id, "Usage: /auction create <item> <starting_price>")
return
parts = args.split(' ', 1)
action = parts[0].lower()
if action == 'create':
if len(parts) < 2:
await self.send_to_player(player_id, "Usage: /auction create <item> <starting_price>")
return
# Parse item and starting price
create_args = parts[1].split(' ')
if len(create_args) < 2:
await self.send_to_player(player_id, "Usage: /auction create <item> <starting_price>")
return
starting_price = create_args[-1]
item = ' '.join(create_args[:-1])
await self.send_to_player(player_id, f"Created auction: {item} starting at {starting_price} gold")
# In a real implementation, this would interact with the auction system
else:
await self.send_to_player(player_id, "Unknown auction action. Use 'create' to start an auction.")
# Helper methods
async def get_player_name(self, player_id):
"""Get player name from player ID"""
# This should be implemented to get the actual player name
return f"Player{player_id}"
async def find_player_by_name(self, name):
"""Find player ID by name"""
# This should be implemented to find players by name
# For now, return None if not found
return None
async def send_to_player(self, player_id, message):
"""Send a message to a specific player"""
# This should be implemented to send messages to players
pass