import json import os from pathlib import Path from datetime import datetime, timedelta from typing import Dict, List, Optional, Any import shutil from core.game_mechanics import Monster class StateManager: """Manages persistent state for users and monsters""" def __init__(self, data_dir: Path): self.data_dir = Path(data_dir) self.users_dir = self.data_dir / "users" self.monsters_dir = self.data_dir / "monsters" self.cache_dir = self.data_dir / "cache" # Create directories if they don't exist for dir_path in [self.users_dir, self.monsters_dir, self.cache_dir]: dir_path.mkdir(parents=True, exist_ok=True) # In-memory cache for active sessions self.active_sessions = {} self.last_save_time = {} def get_user_dir(self, user_id: str) -> Path: """Get or create user directory""" user_dir = self.users_dir / user_id user_dir.mkdir(exist_ok=True) return user_dir def save_monster(self, user_id: str, monster: Monster) -> bool: """Save monster to persistent storage""" try: user_dir = self.get_user_dir(user_id) # Save monster data monster_file = user_dir / f"monster_{monster.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(monster_file, 'w') as f: json.dump(monster.to_dict(), f, indent=2) # Update current monster reference current_file = user_dir / "current_monster.json" current_data = { 'monster_file': str(monster_file.name), 'monster_name': monster.name, 'last_updated': datetime.now().isoformat() } with open(current_file, 'w') as f: json.dump(current_data, f, indent=2) # Update user profile self._update_user_profile(user_id, monster) # Cache in memory self.active_sessions[user_id] = { 'monster': monster, 'last_access': datetime.now() } return True except Exception as e: print(f"Error saving monster: {e}") return False def get_current_monster(self, user_id: str) -> Optional[Monster]: """Get the current active monster for a user""" # Check memory cache first if user_id in self.active_sessions: session = self.active_sessions[user_id] if datetime.now() - session['last_access'] < timedelta(minutes=30): session['last_access'] = datetime.now() return session['monster'] # Load from disk try: user_dir = self.get_user_dir(user_id) current_file = user_dir / "current_monster.json" if not current_file.exists(): return None with open(current_file, 'r') as f: current_data = json.load(f) monster_file = user_dir / current_data['monster_file'] if not monster_file.exists(): return None with open(monster_file, 'r') as f: monster_data = json.load(f) monster = Monster.from_dict(monster_data) # Update cache self.active_sessions[user_id] = { 'monster': monster, 'last_access': datetime.now() } return monster except Exception as e: print(f"Error loading monster: {e}") return None def update_monster(self, user_id: str, monster: Monster) -> bool: """Update existing monster data""" # Update in memory if user_id in self.active_sessions: self.active_sessions[user_id]['monster'] = monster self.active_sessions[user_id]['last_access'] = datetime.now() # Save periodically (every 5 minutes) or if important changes should_save = False current_time = datetime.now() if user_id not in self.last_save_time: should_save = True else: time_since_save = current_time - self.last_save_time[user_id] if time_since_save > timedelta(minutes=5): should_save = True # Always save on evolution or critical states if monster.care_state['health'] < 30 or monster.care_state['hunger'] < 20: should_save = True if should_save: self.last_save_time[user_id] = current_time return self.save_monster(user_id, monster) return True def get_user_monsters(self, user_id: str) -> List[Dict[str, Any]]: """Get all monsters for a user""" try: user_dir = self.get_user_dir(user_id) monsters = [] for file_path in user_dir.glob("monster_*.json"): if file_path.name != "current_monster.json": with open(file_path, 'r') as f: monster_data = json.load(f) monsters.append({ 'file': file_path.name, 'name': monster_data.get('name'), 'species': monster_data.get('species'), 'stage': monster_data.get('stage'), 'birth_time': monster_data.get('birth_time') }) # Sort by birth time (newest first) monsters.sort(key=lambda x: x['birth_time'], reverse=True) return monsters except Exception as e: print(f"Error getting user monsters: {e}") return [] def _update_user_profile(self, user_id: str, monster: Monster): """Update user profile with monster statistics""" try: user_dir = self.get_user_dir(user_id) profile_file = user_dir / "profile.json" # Load existing profile or create new if profile_file.exists(): with open(profile_file, 'r') as f: profile = json.load(f) else: profile = { 'user_id': user_id, 'created': datetime.now().isoformat(), 'monsters_created': 0, 'total_training_sessions': 0, 'achievements': [] } # Update statistics profile['monsters_created'] = profile.get('monsters_created', 0) + 1 profile['last_active'] = datetime.now().isoformat() profile['current_monster'] = monster.name # Check for achievements new_achievements = self._check_achievements(profile, monster) profile['achievements'].extend(new_achievements) # Save profile with open(profile_file, 'w') as f: json.dump(profile, f, indent=2) except Exception as e: print(f"Error updating user profile: {e}") def _check_achievements(self, profile: Dict, monster: Monster) -> List[Dict[str, Any]]: """Check for new achievements""" achievements = [] current_achievements = {a['id'] for a in profile.get('achievements', [])} # First monster achievement if profile['monsters_created'] == 1 and 'first_monster' not in current_achievements: achievements.append({ 'id': 'first_monster', 'name': 'Digital Pioneer', 'description': 'Created your first digital monster', 'icon': '🥇', 'unlocked': datetime.now().isoformat() }) # Multiple monsters achievement if profile['monsters_created'] == 5 and 'monster_collector' not in current_achievements: achievements.append({ 'id': 'monster_collector', 'name': 'Monster Collector', 'description': 'Created 5 digital monsters', 'icon': '🏆', 'unlocked': datetime.now().isoformat() }) # Perfect care achievement if all(monster.care_state[stat] >= 90 for stat in ['hunger', 'happiness', 'health']): if 'perfect_care' not in current_achievements: achievements.append({ 'id': 'perfect_care', 'name': 'Perfect Caretaker', 'description': 'Achieved perfect care status', 'icon': '💖', 'unlocked': datetime.now().isoformat() }) return achievements def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]: """Get user profile""" try: user_dir = self.get_user_dir(user_id) profile_file = user_dir / "profile.json" if profile_file.exists(): with open(profile_file, 'r') as f: return json.load(f) return None except Exception as e: print(f"Error loading user profile: {e}") return None def cleanup_old_sessions(self): """Clean up old sessions from memory""" current_time = datetime.now() expired_users = [] for user_id, session in self.active_sessions.items(): if current_time - session['last_access'] > timedelta(hours=1): expired_users.append(user_id) for user_id in expired_users: # Save before removing from cache if 'monster' in self.active_sessions[user_id]: self.save_monster(user_id, self.active_sessions[user_id]['monster']) del self.active_sessions[user_id] def export_user_data(self, user_id: str) -> Optional[str]: """Export all user data as a zip file""" try: user_dir = self.get_user_dir(user_id) export_path = self.cache_dir / f"export_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Create zip archive shutil.make_archive(str(export_path), 'zip', user_dir) return f"{export_path}.zip" except Exception as e: print(f"Error exporting user data: {e}") return None