digiPal / core /state_manager.py
BladeSzaSza's picture
new design
fe24641
import json
import os
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import shutil
from core.game_mechanics import Monster
class StateManager:
"""Manages persistent state for users and monsters"""
def __init__(self, data_dir: Path):
self.data_dir = Path(data_dir)
self.users_dir = self.data_dir / "users"
self.monsters_dir = self.data_dir / "monsters"
self.cache_dir = self.data_dir / "cache"
# Create directories if they don't exist
for dir_path in [self.users_dir, self.monsters_dir, self.cache_dir]:
dir_path.mkdir(parents=True, exist_ok=True)
# In-memory cache for active sessions
self.active_sessions = {}
self.last_save_time = {}
def get_user_dir(self, user_id: str) -> Path:
"""Get or create user directory"""
user_dir = self.users_dir / user_id
user_dir.mkdir(exist_ok=True)
return user_dir
def save_monster(self, user_id: str, monster: Monster) -> bool:
"""Save monster to persistent storage"""
try:
user_dir = self.get_user_dir(user_id)
# Save monster data
monster_file = user_dir / f"monster_{monster.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(monster_file, 'w') as f:
json.dump(monster.to_dict(), f, indent=2)
# Update current monster reference
current_file = user_dir / "current_monster.json"
current_data = {
'monster_file': str(monster_file.name),
'monster_name': monster.name,
'last_updated': datetime.now().isoformat()
}
with open(current_file, 'w') as f:
json.dump(current_data, f, indent=2)
# Update user profile
self._update_user_profile(user_id, monster)
# Cache in memory
self.active_sessions[user_id] = {
'monster': monster,
'last_access': datetime.now()
}
return True
except Exception as e:
print(f"Error saving monster: {e}")
return False
def get_current_monster(self, user_id: str) -> Optional[Monster]:
"""Get the current active monster for a user"""
# Check memory cache first
if user_id in self.active_sessions:
session = self.active_sessions[user_id]
if datetime.now() - session['last_access'] < timedelta(minutes=30):
session['last_access'] = datetime.now()
return session['monster']
# Load from disk
try:
user_dir = self.get_user_dir(user_id)
current_file = user_dir / "current_monster.json"
if not current_file.exists():
return None
with open(current_file, 'r') as f:
current_data = json.load(f)
monster_file = user_dir / current_data['monster_file']
if not monster_file.exists():
return None
with open(monster_file, 'r') as f:
monster_data = json.load(f)
monster = Monster.from_dict(monster_data)
# Update cache
self.active_sessions[user_id] = {
'monster': monster,
'last_access': datetime.now()
}
return monster
except Exception as e:
print(f"Error loading monster: {e}")
return None
def update_monster(self, user_id: str, monster: Monster) -> bool:
"""Update existing monster data"""
# Update in memory
if user_id in self.active_sessions:
self.active_sessions[user_id]['monster'] = monster
self.active_sessions[user_id]['last_access'] = datetime.now()
# Save periodically (every 5 minutes) or if important changes
should_save = False
current_time = datetime.now()
if user_id not in self.last_save_time:
should_save = True
else:
time_since_save = current_time - self.last_save_time[user_id]
if time_since_save > timedelta(minutes=5):
should_save = True
# Always save on evolution or critical states
if monster.care_state['health'] < 30 or monster.care_state['hunger'] < 20:
should_save = True
if should_save:
self.last_save_time[user_id] = current_time
return self.save_monster(user_id, monster)
return True
def get_user_monsters(self, user_id: str) -> List[Dict[str, Any]]:
"""Get all monsters for a user"""
try:
user_dir = self.get_user_dir(user_id)
monsters = []
for file_path in user_dir.glob("monster_*.json"):
if file_path.name != "current_monster.json":
with open(file_path, 'r') as f:
monster_data = json.load(f)
monsters.append({
'file': file_path.name,
'name': monster_data.get('name'),
'species': monster_data.get('species'),
'stage': monster_data.get('stage'),
'birth_time': monster_data.get('birth_time')
})
# Sort by birth time (newest first)
monsters.sort(key=lambda x: x['birth_time'], reverse=True)
return monsters
except Exception as e:
print(f"Error getting user monsters: {e}")
return []
def _update_user_profile(self, user_id: str, monster: Monster):
"""Update user profile with monster statistics"""
try:
user_dir = self.get_user_dir(user_id)
profile_file = user_dir / "profile.json"
# Load existing profile or create new
if profile_file.exists():
with open(profile_file, 'r') as f:
profile = json.load(f)
else:
profile = {
'user_id': user_id,
'created': datetime.now().isoformat(),
'monsters_created': 0,
'total_training_sessions': 0,
'achievements': []
}
# Update statistics
profile['monsters_created'] = profile.get('monsters_created', 0) + 1
profile['last_active'] = datetime.now().isoformat()
profile['current_monster'] = monster.name
# Check for achievements
new_achievements = self._check_achievements(profile, monster)
profile['achievements'].extend(new_achievements)
# Save profile
with open(profile_file, 'w') as f:
json.dump(profile, f, indent=2)
except Exception as e:
print(f"Error updating user profile: {e}")
def _check_achievements(self, profile: Dict, monster: Monster) -> List[Dict[str, Any]]:
"""Check for new achievements"""
achievements = []
current_achievements = {a['id'] for a in profile.get('achievements', [])}
# First monster achievement
if profile['monsters_created'] == 1 and 'first_monster' not in current_achievements:
achievements.append({
'id': 'first_monster',
'name': 'Digital Pioneer',
'description': 'Created your first digital monster',
'icon': 'πŸ₯‡',
'unlocked': datetime.now().isoformat()
})
# Multiple monsters achievement
if profile['monsters_created'] == 5 and 'monster_collector' not in current_achievements:
achievements.append({
'id': 'monster_collector',
'name': 'Monster Collector',
'description': 'Created 5 digital monsters',
'icon': 'πŸ†',
'unlocked': datetime.now().isoformat()
})
# Perfect care achievement
if all(monster.care_state[stat] >= 90 for stat in ['hunger', 'happiness', 'health']):
if 'perfect_care' not in current_achievements:
achievements.append({
'id': 'perfect_care',
'name': 'Perfect Caretaker',
'description': 'Achieved perfect care status',
'icon': 'πŸ’–',
'unlocked': datetime.now().isoformat()
})
return achievements
def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Get user profile"""
try:
user_dir = self.get_user_dir(user_id)
profile_file = user_dir / "profile.json"
if profile_file.exists():
with open(profile_file, 'r') as f:
return json.load(f)
return None
except Exception as e:
print(f"Error loading user profile: {e}")
return None
def cleanup_old_sessions(self):
"""Clean up old sessions from memory"""
current_time = datetime.now()
expired_users = []
for user_id, session in self.active_sessions.items():
if current_time - session['last_access'] > timedelta(hours=1):
expired_users.append(user_id)
for user_id in expired_users:
# Save before removing from cache
if 'monster' in self.active_sessions[user_id]:
self.save_monster(user_id, self.active_sessions[user_id]['monster'])
del self.active_sessions[user_id]
def export_user_data(self, user_id: str) -> Optional[str]:
"""Export all user data as a zip file"""
try:
user_dir = self.get_user_dir(user_id)
export_path = self.cache_dir / f"export_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Create zip archive
shutil.make_archive(str(export_path), 'zip', user_dir)
return f"{export_path}.zip"
except Exception as e:
print(f"Error exporting user data: {e}")
return None