|
import asyncio
|
|
from src.interfaces.plugin_interfaces import IChatPlugin, PluginMetadata, PluginType
|
|
from typing import Dict, List, Any
|
|
|
|
class EnhancedChatPlugin(IChatPlugin):
|
|
def __init__(self): self._metadata = PluginMetadata(
|
|
id="enhanced_chat",
|
|
name="Enhanced Chat System",
|
|
version="1.2.0",
|
|
author="MMORPG Dev Team",
|
|
description="Adds emotes, chat channels, and advanced chat commands",
|
|
plugin_type=PluginType.SERVICE,
|
|
dependencies=[],
|
|
config={
|
|
"enable_emotes": True,
|
|
"enable_channels": True,
|
|
"max_message_length": 500,
|
|
"chat_cooldown": 1
|
|
}
|
|
)
|
|
self._enabled = False
|
|
self.commands = {
|
|
'/tell': self.tell_command,
|
|
'/whisper': self.whisper_command,
|
|
'/shout': self.shout_command,
|
|
'/emote': self.emote_command,
|
|
'/me': self.emote_command,
|
|
'/who': self.who_command,
|
|
'/time': self.time_command,
|
|
'/help': self.help_command,
|
|
'/commands': self.help_command,
|
|
'/clear': self.clear_command,
|
|
'/ignore': self.ignore_command,
|
|
'/unignore': self.unignore_command,
|
|
'/history': self.history_command,
|
|
'/channels': self.channels_command,
|
|
'/join': self.join_channel_command,
|
|
'/leave': self.leave_channel_command,
|
|
'/channel': self.channel_command,
|
|
'/c': self.channel_command,
|
|
'/mail': self.mail_command,
|
|
'/read': self.read_mail_command,
|
|
'/mailbox': self.mailbox_command,
|
|
'/marketplace': self.marketplace_command,
|
|
'/trade': self.trade_command,
|
|
'/auction': self.auction_command,
|
|
}
|
|
|
|
|
|
self.chat_history = []
|
|
self.max_history = 100
|
|
|
|
|
|
self.ignore_lists = {}
|
|
|
|
|
|
self.channels = {
|
|
'global': {'description': 'Global chat channel', 'members': set()},
|
|
'trade': {'description': 'Trading channel', 'members': set()},
|
|
'help': {'description': 'Help and questions channel', 'members': set()}
|
|
}
|
|
|
|
|
|
self.player_channels = {}
|
|
|
|
@property
|
|
def metadata(self) -> PluginMetadata:
|
|
return self._metadata
|
|
|
|
def initialize(self, context: Dict[str, Any]) -> bool:
|
|
"""Initialize the enhanced chat plugin."""
|
|
try:
|
|
self._config = self._metadata.config
|
|
self._enabled = True
|
|
print(f"💬 Enhanced Chat Plugin initialized")
|
|
return True
|
|
except Exception as e:
|
|
print(f"💬 Failed to initialize Enhanced Chat Plugin: {e}")
|
|
return False
|
|
|
|
def shutdown(self) -> bool:
|
|
"""Shutdown the plugin and cleanup resources."""
|
|
self._enabled = False
|
|
print("💬 Enhanced Chat Plugin shut down")
|
|
return True
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current plugin status."""
|
|
return {
|
|
"status": "active" if self._enabled else "inactive",
|
|
"message": "Enhanced chat system with marketplace commands"
|
|
}
|
|
|
|
def process_message(self, player_id: str, message: str, channel: str = "global") -> Dict[str, Any]:
|
|
"""Process and enhance chat messages."""
|
|
if message.startswith('/'):
|
|
return self.process_command(player_id, message)
|
|
else:
|
|
return self.broadcast_message(player_id, message)
|
|
|
|
def get_available_commands(self) -> List[Dict[str, str]]:
|
|
"""Get list of available chat commands."""
|
|
return [
|
|
{"command": "/marketplace", "description": "View marketplace listings"},
|
|
{"command": "/trade create <item> <price>", "description": "Create trade listing"},
|
|
{"command": "/trade accept <id>", "description": "Accept trade offer"},
|
|
{"command": "/auction create <item> <start_price>", "description": "Create auction"},
|
|
{"command": "/tell <player> <message>", "description": "Send private message"},
|
|
{"command": "/who", "description": "List online players"},
|
|
{"command": "/help", "description": "Show available commands"}
|
|
]
|
|
|
|
def get_chat_channels(self) -> Dict[str, Dict[str, str]]:
|
|
"""Get available chat channels."""
|
|
return {
|
|
"global": {"name": "Global", "description": "Main chat channel"},
|
|
"trade": {"name": "Trade", "description": "Trading discussions"},
|
|
"help": {"name": "Help", "description": "Help and questions"}
|
|
}
|
|
|
|
async def on_player_message(self, player_id, message):
|
|
"""Handle player chat messages"""
|
|
if message.startswith('/'):
|
|
await self.process_command(player_id, message)
|
|
else:
|
|
await self.broadcast_message(player_id, message)
|
|
|
|
async def process_command(self, player_id, message):
|
|
"""Process chat commands"""
|
|
parts = message.split(' ', 1)
|
|
command = parts[0].lower()
|
|
args = parts[1] if len(parts) > 1 else ""
|
|
|
|
if command in self.commands:
|
|
try:
|
|
await self.commands[command](player_id, args)
|
|
except Exception as e:
|
|
await self.send_to_player(player_id, f"Error executing command: {str(e)}")
|
|
else:
|
|
await self.send_to_player(player_id, f"Unknown command: {command}. Type /help for available commands.")
|
|
|
|
async def broadcast_message(self, player_id, message):
|
|
"""Broadcast a message to all players"""
|
|
player_name = await self.get_player_name(player_id)
|
|
formatted_message = f"{player_name}: {message}"
|
|
|
|
|
|
self.add_to_history(formatted_message)
|
|
|
|
|
|
for pid in self.game_manager.get_connected_players():
|
|
if pid not in self.ignore_lists.get(player_id, set()):
|
|
await self.send_to_player(pid, formatted_message)
|
|
|
|
def add_to_history(self, message):
|
|
"""Add message to chat history"""
|
|
self.chat_history.append({
|
|
'timestamp': asyncio.get_event_loop().time(),
|
|
'message': message
|
|
})
|
|
|
|
|
|
if len(self.chat_history) > self.max_history:
|
|
self.chat_history.pop(0)
|
|
|
|
async def tell_command(self, player_id, args):
|
|
"""Send a private message to another player"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /tell <player> <message>")
|
|
return
|
|
|
|
parts = args.split(' ', 1)
|
|
if len(parts) < 2:
|
|
await self.send_to_player(player_id, "Usage: /tell <player> <message>")
|
|
return
|
|
|
|
target_name, message = parts
|
|
target_id = await self.find_player_by_name(target_name)
|
|
|
|
if not target_id:
|
|
await self.send_to_player(player_id, f"Player '{target_name}' not found.")
|
|
return
|
|
|
|
if target_id in self.ignore_lists.get(player_id, set()):
|
|
await self.send_to_player(player_id, f"You are ignoring {target_name}.")
|
|
return
|
|
|
|
sender_name = await self.get_player_name(player_id)
|
|
await self.send_to_player(target_id, f"[TELL from {sender_name}]: {message}")
|
|
await self.send_to_player(player_id, f"[TELL to {target_name}]: {message}")
|
|
|
|
async def whisper_command(self, player_id, args):
|
|
"""Alias for tell command"""
|
|
await self.tell_command(player_id, args)
|
|
|
|
async def shout_command(self, player_id, args):
|
|
"""Send a message to all players in a wider area"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /shout <message>")
|
|
return
|
|
|
|
player_name = await self.get_player_name(player_id)
|
|
formatted_message = f"{player_name} shouts: {args}"
|
|
|
|
|
|
self.add_to_history(formatted_message)
|
|
|
|
|
|
for pid in self.game_manager.get_connected_players():
|
|
if pid not in self.ignore_lists.get(player_id, set()):
|
|
await self.send_to_player(pid, formatted_message)
|
|
|
|
async def emote_command(self, player_id, args):
|
|
"""Send an emote message"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /emote <action> or /me <action>")
|
|
return
|
|
|
|
player_name = await self.get_player_name(player_id)
|
|
formatted_message = f"* {player_name} {args}"
|
|
|
|
|
|
self.add_to_history(formatted_message)
|
|
|
|
|
|
for pid in self.game_manager.get_connected_players():
|
|
if pid not in self.ignore_lists.get(player_id, set()):
|
|
await self.send_to_player(pid, formatted_message)
|
|
|
|
async def who_command(self, player_id, args):
|
|
"""Show list of online players"""
|
|
connected_players = self.game_manager.get_connected_players()
|
|
player_names = []
|
|
|
|
for pid in connected_players:
|
|
name = await self.get_player_name(pid)
|
|
player_names.append(name)
|
|
|
|
if player_names:
|
|
message = f"Online players ({len(player_names)}): {', '.join(player_names)}"
|
|
else:
|
|
message = "No players currently online."
|
|
|
|
await self.send_to_player(player_id, message)
|
|
|
|
async def time_command(self, player_id, args):
|
|
"""Show current game time"""
|
|
import datetime
|
|
current_time = datetime.datetime.now().strftime("%H:%M:%S")
|
|
await self.send_to_player(player_id, f"Current time: {current_time}")
|
|
|
|
async def help_command(self, player_id, args):
|
|
"""Show available commands"""
|
|
commands = [
|
|
"/tell <player> <msg> - Send private message",
|
|
"/whisper <player> <msg> - Same as /tell",
|
|
"/shout <message> - Shout to all players",
|
|
"/emote <action> - Perform an action",
|
|
"/me <action> - Same as /emote",
|
|
"/who - Show online players",
|
|
"/time - Show current time",
|
|
"/help - Show this help",
|
|
"/clear - Clear your chat",
|
|
"/ignore <player> - Ignore a player",
|
|
"/unignore <player> - Stop ignoring a player",
|
|
"/history - Show recent chat history",
|
|
"/channels - Show available channels",
|
|
"/join <channel> - Join a channel",
|
|
"/leave <channel> - Leave a channel",
|
|
"/channel <channel> <msg> - Send message to channel",
|
|
"/c <channel> <msg> - Same as /channel",
|
|
"/mail <player> <msg> - Send mail to player",
|
|
"/read - Read your mail",
|
|
"/mailbox - Check mailbox status",
|
|
"/marketplace - View marketplace listings",
|
|
"/trade create <item> <price> - Create trade offer",
|
|
"/trade accept <id> - Accept trade offer",
|
|
"/auction create <item> <start_price> - Create auction"
|
|
]
|
|
|
|
await self.send_to_player(player_id, "Available commands:")
|
|
for cmd in commands:
|
|
await self.send_to_player(player_id, f" {cmd}")
|
|
|
|
async def clear_command(self, player_id, args):
|
|
"""Clear player's chat"""
|
|
|
|
await self.send_to_player(player_id, "\n" * 20)
|
|
await self.send_to_player(player_id, "Chat cleared.")
|
|
|
|
async def ignore_command(self, player_id, args):
|
|
"""Add a player to ignore list"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /ignore <player>")
|
|
return
|
|
|
|
target_id = await self.find_player_by_name(args.strip())
|
|
if not target_id:
|
|
await self.send_to_player(player_id, f"Player '{args}' not found.")
|
|
return
|
|
|
|
if target_id == player_id:
|
|
await self.send_to_player(player_id, "You cannot ignore yourself.")
|
|
return
|
|
|
|
if player_id not in self.ignore_lists:
|
|
self.ignore_lists[player_id] = set()
|
|
|
|
self.ignore_lists[player_id].add(target_id)
|
|
await self.send_to_player(player_id, f"You are now ignoring {args}.")
|
|
|
|
async def unignore_command(self, player_id, args):
|
|
"""Remove a player from ignore list"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /unignore <player>")
|
|
return
|
|
|
|
target_id = await self.find_player_by_name(args.strip())
|
|
if not target_id:
|
|
await self.send_to_player(player_id, f"Player '{args}' not found.")
|
|
return
|
|
|
|
if player_id in self.ignore_lists and target_id in self.ignore_lists[player_id]:
|
|
self.ignore_lists[player_id].remove(target_id)
|
|
await self.send_to_player(player_id, f"You are no longer ignoring {args}.")
|
|
else:
|
|
await self.send_to_player(player_id, f"You are not ignoring {args}.")
|
|
|
|
async def history_command(self, player_id, args):
|
|
"""Show recent chat history"""
|
|
if not self.chat_history:
|
|
await self.send_to_player(player_id, "No chat history available.")
|
|
return
|
|
|
|
await self.send_to_player(player_id, "Recent chat history:")
|
|
for entry in self.chat_history[-10:]:
|
|
await self.send_to_player(player_id, entry['message'])
|
|
|
|
async def channels_command(self, player_id, args):
|
|
"""Show available channels"""
|
|
await self.send_to_player(player_id, "Available channels:")
|
|
for channel_name, channel_info in self.channels.items():
|
|
member_count = len(channel_info['members'])
|
|
await self.send_to_player(player_id, f" {channel_name}: {channel_info['description']} ({member_count} members)")
|
|
|
|
async def join_channel_command(self, player_id, args):
|
|
"""Join a chat channel"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /join <channel>")
|
|
return
|
|
|
|
channel_name = args.strip().lower()
|
|
if channel_name not in self.channels:
|
|
await self.send_to_player(player_id, f"Channel '{channel_name}' does not exist.")
|
|
return
|
|
|
|
if player_id not in self.player_channels:
|
|
self.player_channels[player_id] = set()
|
|
|
|
self.player_channels[player_id].add(channel_name)
|
|
self.channels[channel_name]['members'].add(player_id)
|
|
|
|
player_name = await self.get_player_name(player_id)
|
|
await self.send_to_player(player_id, f"You have joined the '{channel_name}' channel.")
|
|
|
|
|
|
for member_id in self.channels[channel_name]['members']:
|
|
if member_id != player_id:
|
|
await self.send_to_player(member_id, f"{player_name} has joined the '{channel_name}' channel.")
|
|
|
|
async def leave_channel_command(self, player_id, args):
|
|
"""Leave a chat channel"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /leave <channel>")
|
|
return
|
|
|
|
channel_name = args.strip().lower()
|
|
if channel_name not in self.channels:
|
|
await self.send_to_player(player_id, f"Channel '{channel_name}' does not exist.")
|
|
return
|
|
|
|
if player_id not in self.player_channels or channel_name not in self.player_channels[player_id]:
|
|
await self.send_to_player(player_id, f"You are not in the '{channel_name}' channel.")
|
|
return
|
|
|
|
self.player_channels[player_id].remove(channel_name)
|
|
self.channels[channel_name]['members'].discard(player_id)
|
|
|
|
player_name = await self.get_player_name(player_id)
|
|
await self.send_to_player(player_id, f"You have left the '{channel_name}' channel.")
|
|
|
|
|
|
for member_id in self.channels[channel_name]['members']:
|
|
await self.send_to_player(member_id, f"{player_name} has left the '{channel_name}' channel.")
|
|
|
|
async def channel_command(self, player_id, args):
|
|
"""Send a message to a specific channel"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /channel <channel> <message>")
|
|
return
|
|
|
|
parts = args.split(' ', 1)
|
|
if len(parts) < 2:
|
|
await self.send_to_player(player_id, "Usage: /channel <channel> <message>")
|
|
return
|
|
|
|
channel_name, message = parts
|
|
channel_name = channel_name.lower()
|
|
|
|
if channel_name not in self.channels:
|
|
await self.send_to_player(player_id, f"Channel '{channel_name}' does not exist.")
|
|
return
|
|
|
|
if player_id not in self.player_channels or channel_name not in self.player_channels[player_id]:
|
|
await self.send_to_player(player_id, f"You are not in the '{channel_name}' channel. Use /join {channel_name} first.")
|
|
return
|
|
|
|
player_name = await self.get_player_name(player_id)
|
|
formatted_message = f"[{channel_name.upper()}] {player_name}: {message}"
|
|
|
|
|
|
for member_id in self.channels[channel_name]['members']:
|
|
if member_id not in self.ignore_lists.get(player_id, set()):
|
|
await self.send_to_player(member_id, formatted_message)
|
|
|
|
async def mail_command(self, player_id, args):
|
|
"""Send mail to a player using the read2burn system"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /mail <player> <message>")
|
|
return
|
|
|
|
parts = args.split(' ', 1)
|
|
if len(parts) < 2:
|
|
await self.send_to_player(player_id, "Usage: /mail <player> <message>")
|
|
return
|
|
|
|
target_name, message = parts
|
|
target_id = await self.find_player_by_name(target_name)
|
|
|
|
if not target_id:
|
|
await self.send_to_player(player_id, f"Player '{target_name}' not found.")
|
|
return
|
|
|
|
|
|
try:
|
|
read2burn_addon = self.game_manager.get_addon('read2burn')
|
|
if read2burn_addon:
|
|
sender_name = await self.get_player_name(player_id)
|
|
await read2burn_addon.send_message(target_id, f"Mail from {sender_name}: {message}")
|
|
await self.send_to_player(player_id, f"Mail sent to {target_name}.")
|
|
else:
|
|
await self.send_to_player(player_id, "Mail system is not available.")
|
|
except Exception as e:
|
|
await self.send_to_player(player_id, f"Failed to send mail: {str(e)}")
|
|
|
|
async def read_mail_command(self, player_id, args):
|
|
"""Read mail using the read2burn system"""
|
|
try:
|
|
read2burn_addon = self.game_manager.get_addon('read2burn')
|
|
if read2burn_addon:
|
|
messages = await read2burn_addon.get_messages(player_id)
|
|
if messages:
|
|
await self.send_to_player(player_id, f"You have {len(messages)} unread message(s):")
|
|
for i, msg in enumerate(messages):
|
|
await self.send_to_player(player_id, f"{i+1}. {msg}")
|
|
|
|
await read2burn_addon.clear_messages(player_id)
|
|
else:
|
|
await self.send_to_player(player_id, "You have no unread mail.")
|
|
else:
|
|
await self.send_to_player(player_id, "Mail system is not available.")
|
|
except Exception as e:
|
|
await self.send_to_player(player_id, f"Failed to read mail: {str(e)}")
|
|
|
|
async def mailbox_command(self, player_id, args):
|
|
"""Check mailbox status"""
|
|
try:
|
|
read2burn_addon = self.game_manager.get_addon('read2burn')
|
|
if read2burn_addon:
|
|
message_count = await read2burn_addon.get_message_count(player_id)
|
|
await self.send_to_player(player_id, f"You have {message_count} unread message(s) in your mailbox.")
|
|
else:
|
|
await self.send_to_player(player_id, "Mail system is not available.")
|
|
except Exception as e:
|
|
await self.send_to_player(player_id, f"Failed to check mailbox: {str(e)}")
|
|
|
|
async def marketplace_command(self, player_id, args):
|
|
"""Show marketplace listings"""
|
|
|
|
demo_listings = [
|
|
"1. Iron Sword - 100 gold (seller: Alice)",
|
|
"2. Magic Potion - 50 gold (seller: Bob)",
|
|
"3. Dragon Scale - 500 gold (seller: Charlie)",
|
|
"4. Enchanted Ring - 250 gold (seller: Diana)"
|
|
]
|
|
|
|
await self.send_to_player(player_id, "=== MARKETPLACE ===")
|
|
await self.send_to_player(player_id, "Current listings:")
|
|
for listing in demo_listings:
|
|
await self.send_to_player(player_id, f" {listing}")
|
|
await self.send_to_player(player_id, "Use '/trade create <item> <price>' to list an item")
|
|
await self.send_to_player(player_id, "Use '/trade accept <id>' to purchase an item")
|
|
|
|
async def trade_command(self, player_id, args):
|
|
"""Handle trading commands"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /trade <create|accept> [arguments]")
|
|
return
|
|
|
|
parts = args.split(' ', 1)
|
|
action = parts[0].lower()
|
|
|
|
if action == 'create':
|
|
if len(parts) < 2:
|
|
await self.send_to_player(player_id, "Usage: /trade create <item> <price>")
|
|
return
|
|
|
|
|
|
create_args = parts[1].split(' ')
|
|
if len(create_args) < 2:
|
|
await self.send_to_player(player_id, "Usage: /trade create <item> <price>")
|
|
return
|
|
|
|
price = create_args[-1]
|
|
item = ' '.join(create_args[:-1])
|
|
|
|
await self.send_to_player(player_id, f"Created trade listing: {item} for {price} gold")
|
|
|
|
|
|
elif action == 'accept':
|
|
if len(parts) < 2:
|
|
await self.send_to_player(player_id, "Usage: /trade accept <listing_id>")
|
|
return
|
|
|
|
listing_id = parts[1].strip()
|
|
await self.send_to_player(player_id, f"Attempting to purchase listing {listing_id}...")
|
|
|
|
|
|
else:
|
|
await self.send_to_player(player_id, "Unknown trade action. Use 'create' or 'accept'.")
|
|
|
|
async def auction_command(self, player_id, args):
|
|
"""Handle auction commands"""
|
|
if not args:
|
|
await self.send_to_player(player_id, "Usage: /auction create <item> <starting_price>")
|
|
return
|
|
|
|
parts = args.split(' ', 1)
|
|
action = parts[0].lower()
|
|
|
|
if action == 'create':
|
|
if len(parts) < 2:
|
|
await self.send_to_player(player_id, "Usage: /auction create <item> <starting_price>")
|
|
return
|
|
|
|
|
|
create_args = parts[1].split(' ')
|
|
if len(create_args) < 2:
|
|
await self.send_to_player(player_id, "Usage: /auction create <item> <starting_price>")
|
|
return
|
|
|
|
starting_price = create_args[-1]
|
|
item = ' '.join(create_args[:-1])
|
|
|
|
await self.send_to_player(player_id, f"Created auction: {item} starting at {starting_price} gold")
|
|
|
|
|
|
else:
|
|
await self.send_to_player(player_id, "Unknown auction action. Use 'create' to start an auction.")
|
|
|
|
|
|
async def get_player_name(self, player_id):
|
|
"""Get player name from player ID"""
|
|
|
|
return f"Player{player_id}"
|
|
|
|
async def find_player_by_name(self, name):
|
|
"""Find player ID by name"""
|
|
|
|
|
|
return None
|
|
|
|
async def send_to_player(self, player_id, message):
|
|
"""Send a message to a specific player"""
|
|
|
|
pass
|
|
|