|
"""
|
|
NPC management service implementation.
|
|
"""
|
|
|
|
import random
|
|
import time
|
|
import threading
|
|
import os
|
|
from typing import Dict, Optional, List
|
|
from ..interfaces.service_interfaces import INPCService
|
|
from ..interfaces.game_interfaces import IGameWorld
|
|
from ..interfaces.npc_interfaces import INPCBehavior
|
|
|
|
|
|
class DonaldBehavior(INPCBehavior):
|
|
"""Deal maker NPC behavior with funny responses."""
|
|
|
|
def __init__(self):
|
|
self.responses = [
|
|
"Listen, I make the best deals. Nobody makes deals like me, believe me!",
|
|
"This game is tremendous, absolutely tremendous. I helped design it, you know.",
|
|
"I have the best NPCs, the most fantastic NPCs you've ever seen!",
|
|
"Fake news! I never said that. Well, maybe I did, but it was taken out of context.",
|
|
"You know what? You're fired! Just kidding, I can't fire you here.",
|
|
"I built a wall around my shop. Mexico paid for it. Well, not really, but...",
|
|
"My inventory is huge, absolutely huge. Some say it's the biggest ever.",
|
|
"I tweeted about this earlier. Did you see my tweet? It was perfect.",
|
|
"Nobody knows trading like I know trading. I wrote the book on it!",
|
|
"This is a witch hunt! I've done nothing wrong! Perfect conversation!",
|
|
"I have tremendous respect for you, tremendous. But you're wrong.",
|
|
"Covfefe! Wait, wrong universe. But you get the idea, right?",
|
|
"I'm a stable genius when it comes to trading. Very stable, very genius.",
|
|
"That's fake gold! I only deal in real gold. 24 karat, the best gold.",
|
|
"I love all my customers equally. Except the ones who don't buy anything."
|
|
]
|
|
|
|
self.greetings = [
|
|
"Welcome to Donald's Tremendous Trading Post! The best shop, believe me!",
|
|
"You've never seen deals like these. Nobody has deals like these!",
|
|
"Step right up to the most incredible shop in the entire game world!",
|
|
"I'm Donald, and I approve this shopping experience. It's tremendous!",
|
|
"Welcome! I have the best items, folks. The absolute best!"
|
|
]
|
|
|
|
def get_response(self, npc_id: str, message: str, player_id: str = None) -> str:
|
|
"""Generate Deal Maker-style response."""
|
|
|
|
message_lower = message.lower()
|
|
|
|
if any(word in message_lower for word in ['buy', 'sell', 'trade', 'shop', 'item']):
|
|
trade_responses = [
|
|
"You want to make a deal? I love deals! What do you got?",
|
|
"Trading is my specialty. I literally wrote 'The Art of the Deal'!",
|
|
"Let me tell you about my inventory - it's incredible, absolutely incredible!",
|
|
"Nobody, and I mean NOBODY, has better prices than me!",
|
|
"This is the best trade you'll ever make, believe me!"
|
|
]
|
|
return random.choice(trade_responses)
|
|
|
|
elif any(word in message_lower for word in ['hello', 'hi', 'hey', 'greetings']):
|
|
return random.choice(self.greetings)
|
|
|
|
elif any(word in message_lower for word in ['help', 'assist', 'support']):
|
|
help_responses = [
|
|
"Help? I don't need help! But I'll help you because I'm generous.",
|
|
"The best help you can get is right here, from me, Donald!",
|
|
"I'm always helping people. It's what I do. I'm very helpful.",
|
|
"You came to the right NPC for help. The absolute right NPC!"
|
|
]
|
|
return random.choice(help_responses)
|
|
|
|
elif any(word in message_lower for word in ['wall', 'build', 'construction']):
|
|
return "I built the most beautiful wall around my shop. It's incredible!"
|
|
|
|
elif any(word in message_lower for word in ['fake', 'news', 'media']):
|
|
return "Fake news! The media is so dishonest. But you seem trustworthy!"
|
|
|
|
else:
|
|
return random.choice(self.responses)
|
|
|
|
def get_greeting(self, npc_id: str, player_id: str = None) -> str:
|
|
"""Get greeting message."""
|
|
return random.choice(self.greetings)
|
|
|
|
def can_handle(self, npc_id: str) -> bool:
|
|
"""Check if this behavior handles Donald NPC."""
|
|
return npc_id == "donald"
|
|
|
|
|
|
class GenericNPCBehavior(INPCBehavior):
|
|
"""Generic NPC behavior for standard NPCs."""
|
|
|
|
def __init__(self):
|
|
self.personality_responses = {
|
|
'merchant': [
|
|
"Welcome to my shop! What would you like to buy?",
|
|
"I have the finest items in the realm!",
|
|
"Special discount today - 10% off all potions!"
|
|
],
|
|
'wise_teacher': [
|
|
"Ah, a curious mind! What would you like to learn?",
|
|
"Knowledge is the greatest treasure. Ask me anything!",
|
|
"I have studied the ancient texts for decades.",
|
|
"Wisdom comes through questioning. What puzzles you?"
|
|
],
|
|
'comedian': [
|
|
"Haha! Want to hear a joke? Why don't skeletons fight? They don't have the guts!",
|
|
"What do you call a sleeping bull? A bulldozer! *laughs*",
|
|
"I've got jokes for days! Life's too short not to laugh!",
|
|
"Why did the scarecrow win an award? He was outstanding in his field!"
|
|
],
|
|
'tough_trainer': [
|
|
"Ready for combat training? Show me your stance!",
|
|
"A true warrior trains every day. Are you committed?",
|
|
"Strength comes from discipline and practice!",
|
|
"The blade is an extension of your will. Focus!"
|
|
],
|
|
'gentle_healer': [
|
|
"Blessings upon you, traveler. Do you need healing?",
|
|
"The light guides my hands. I can mend your wounds.",
|
|
"Health of body and spirit go hand in hand.",
|
|
"May the divine light restore your vitality!"
|
|
],
|
|
'mystical_sage': [
|
|
"Magic flows through all things, young one.",
|
|
"The arcane arts require patience and understanding.",
|
|
"Ancient powers stir... can you feel them?",
|
|
"Wisdom and magic are closely intertwined."
|
|
],
|
|
'traveler': [
|
|
"The road calls to me... always moving, always exploring!",
|
|
"I've seen wonders beyond imagination in my travels.",
|
|
"Adventure awaits around every corner!",
|
|
"Sometimes the journey is more important than the destination."
|
|
]
|
|
}
|
|
|
|
def get_response(self, npc_id: str, message: str, player_id: str = None) -> str:
|
|
"""Generate generic NPC response based on personality."""
|
|
|
|
|
|
personality_map = {
|
|
'merchant': 'merchant',
|
|
'scholar': 'wise_teacher',
|
|
'jester': 'comedian',
|
|
'warrior': 'tough_trainer',
|
|
'healer': 'gentle_healer',
|
|
'sage': 'mystical_sage',
|
|
'wanderer': 'traveler'
|
|
}
|
|
|
|
personality = personality_map.get(npc_id, 'merchant')
|
|
responses = self.personality_responses.get(personality, [
|
|
"Hello there, traveler!",
|
|
"How can I help you?",
|
|
"Nice weather we're having."
|
|
])
|
|
|
|
return random.choice(responses)
|
|
|
|
def get_greeting(self, npc_id: str, player_id: str = None) -> str:
|
|
"""Get greeting message."""
|
|
return self.get_response(npc_id, "hello", player_id)
|
|
|
|
def can_handle(self, npc_id: str) -> bool:
|
|
"""This behavior can handle any NPC."""
|
|
return True
|
|
|
|
|
|
class NPCService(INPCService):
|
|
"""Service for managing NPCs in the game world."""
|
|
|
|
def __init__(self, game_world: IGameWorld):
|
|
self.game_world = game_world
|
|
self.behaviors = [
|
|
DonaldBehavior(),
|
|
GenericNPCBehavior()
|
|
]
|
|
|
|
self._movement_thread = None
|
|
self._movement_active = False
|
|
|
|
|
|
if not os.environ.get('PYTEST_CURRENT_TEST') and not hasattr(game_world, '_is_test_instance'):
|
|
self.start_movement_system()
|
|
|
|
def start_movement_system(self):
|
|
"""Start the NPC movement system for moving NPCs like Roaming Rick."""
|
|
if self._movement_thread is None or not self._movement_thread.is_alive():
|
|
self._movement_active = True
|
|
self._movement_thread = threading.Thread(target=self._movement_loop, daemon=True)
|
|
self._movement_thread.start()
|
|
|
|
def stop_movement_system(self):
|
|
"""Stop the NPC movement system."""
|
|
self._movement_active = False
|
|
if self._movement_thread and self._movement_thread.is_alive():
|
|
self._movement_thread.join(timeout=1.0)
|
|
self._movement_thread = None
|
|
|
|
def _movement_loop(self):
|
|
"""Main movement loop for updating moving NPCs."""
|
|
while self._movement_active:
|
|
try:
|
|
self.update_moving_npcs()
|
|
time.sleep(2.0)
|
|
except Exception as e:
|
|
print(f"[NPCService] Movement loop error: {e}")
|
|
if "Mock" in str(e):
|
|
print(f"[NPCService] Stopping movement loop due to mock error")
|
|
break
|
|
time.sleep(5.0)
|
|
|
|
def update_moving_npcs(self):
|
|
"""Update positions of moving NPCs like Roaming Rick."""
|
|
current_time = time.time()
|
|
|
|
try:
|
|
npcs = self.get_all_npcs()
|
|
|
|
|
|
if hasattr(npcs, '__iter__'):
|
|
try:
|
|
npc_items = npcs.items() if hasattr(npcs, 'items') else []
|
|
except Exception:
|
|
|
|
print("[NPCService] Error in update_moving_npcs: Mock object iteration failed")
|
|
return
|
|
else:
|
|
print("[NPCService] Error in update_moving_npcs: npcs is not iterable")
|
|
return
|
|
|
|
for npc_id, npc in npc_items:
|
|
|
|
if npc.get('type') == 'moving' and 'movement' in npc:
|
|
movement = npc['movement']
|
|
|
|
|
|
time_since_last_move = current_time - movement.get('last_move', 0)
|
|
movement_interval = 2.0 / movement.get('speed', 1)
|
|
|
|
if time_since_last_move >= movement_interval:
|
|
|
|
old_x, old_y = npc['x'], npc['y']
|
|
|
|
|
|
move_distance = 25
|
|
direction_x = movement.get('direction_x', 1)
|
|
direction_y = movement.get('direction_y', 1)
|
|
|
|
new_x = npc['x'] + (move_distance * direction_x)
|
|
new_y = npc['y'] + (move_distance * direction_y)
|
|
|
|
|
|
world_width = getattr(self.game_world, 'world_width', 475)
|
|
world_height = getattr(self.game_world, 'world_height', 375)
|
|
|
|
if new_x <= 0 or new_x >= world_width:
|
|
direction_x *= -1
|
|
new_x = max(0, min(world_width, new_x))
|
|
|
|
if new_y <= 0 or new_y >= world_height:
|
|
direction_y *= -1
|
|
new_y = max(0, min(world_height, new_y))
|
|
|
|
|
|
npc['x'] = new_x
|
|
npc['y'] = new_y
|
|
movement['direction_x'] = direction_x
|
|
movement['direction_y'] = direction_y
|
|
movement['last_move'] = current_time
|
|
|
|
|
|
if old_x != new_x or old_y != new_y:
|
|
if hasattr(self.game_world, 'add_world_event'):
|
|
self.game_world.add_world_event(f"🚶 {npc['name']} roams to ({int(new_x)}, {int(new_y)})")
|
|
|
|
|
|
try:
|
|
players = self.game_world.get_all_players()
|
|
for player_id, player in players.items():
|
|
distance = ((player.x - new_x)**2 + (player.y - new_y)**2)**0.5
|
|
if distance < 50:
|
|
if hasattr(self.game_world, 'add_world_event'):
|
|
self.game_world.add_world_event(f"👀 {player.name} notices {npc['name']} nearby")
|
|
except Exception as e:
|
|
print(f"[NPCService] Error checking player proximity: {e}")
|
|
|
|
except Exception as e:
|
|
print(f"[NPCService] Error in update_moving_npcs: {e}")
|
|
|
|
def get_npc_response(self, npc_id: str, message: str, player_id: str = None) -> str:
|
|
"""Get NPC response to player message."""
|
|
try:
|
|
|
|
for behavior in self.behaviors:
|
|
if behavior.can_handle(npc_id):
|
|
return behavior.get_response(npc_id, message, player_id)
|
|
|
|
|
|
return "I don't understand what you're saying."
|
|
|
|
except Exception as e:
|
|
print(f"[NPCService] Error generating response: {e}")
|
|
return "Sorry, I'm having trouble understanding you right now."
|
|
|
|
def register_npc(self, npc_id: str, npc_data: Dict) -> bool:
|
|
"""Register a new NPC in the game world."""
|
|
try:
|
|
if hasattr(self.game_world, 'add_npc'):
|
|
self.game_world.add_npc(npc_id, npc_data)
|
|
return True
|
|
else:
|
|
|
|
npcs = getattr(self.game_world, 'npcs', {})
|
|
npcs[npc_id] = npc_data
|
|
return True
|
|
except Exception as e:
|
|
print(f"[NPCService] Error registering NPC: {e}")
|
|
return False
|
|
|
|
def get_npc(self, npc_id: str) -> Optional[Dict]:
|
|
"""Get NPC data by ID."""
|
|
try:
|
|
if hasattr(self.game_world, 'get_npc'):
|
|
return self.game_world.get_npc(npc_id)
|
|
else:
|
|
npcs = getattr(self.game_world, 'npcs', {})
|
|
return npcs.get(npc_id)
|
|
except Exception as e:
|
|
print(f"[NPCService] Error getting NPC: {e}")
|
|
return None
|
|
|
|
def update_npc_position(self, npc_id: str, x: int, y: int) -> bool:
|
|
"""Update NPC position."""
|
|
try:
|
|
npc = self.get_npc(npc_id)
|
|
if npc:
|
|
npc['x'] = x
|
|
npc['y'] = y
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
print(f"[NPCService] Error updating NPC position: {e}")
|
|
return False
|
|
|
|
def get_npc_greeting(self, npc_id: str, player_id: str = None) -> str:
|
|
"""Get greeting message from NPC."""
|
|
for behavior in self.behaviors:
|
|
if behavior.can_handle(npc_id):
|
|
return behavior.get_greeting(npc_id, player_id)
|
|
return "Hello, traveler!"
|
|
|
|
def add_behavior(self, behavior: INPCBehavior) -> bool:
|
|
"""Add a new NPC behavior."""
|
|
try:
|
|
|
|
self.behaviors.insert(-1, behavior)
|
|
return True
|
|
except Exception as e:
|
|
print(f"[NPCService] Error adding behavior: {e}")
|
|
return False
|
|
|
|
def remove_behavior(self, behavior_class: type) -> bool:
|
|
"""Remove an NPC behavior by class."""
|
|
try:
|
|
self.behaviors = [b for b in self.behaviors if not isinstance(b, behavior_class)]
|
|
return True
|
|
except Exception as e:
|
|
print(f"[NPCService] Error removing behavior: {e}")
|
|
return False
|
|
|
|
def get_all_npcs(self) -> Dict[str, Dict]:
|
|
"""Get all NPCs from the game world."""
|
|
try:
|
|
if hasattr(self.game_world, 'get_all_npcs'):
|
|
return self.game_world.get_all_npcs()
|
|
else:
|
|
return getattr(self.game_world, 'npcs', {})
|
|
except Exception as e:
|
|
print(f"[NPCService] Error getting all NPCs: {e}")
|
|
return {}
|
|
|