ZOTHEOS-App / modules /memory_bank_public.py
ZOTHEOS's picture
Upload 9 files
4d1849f verified
# FILE: modules/memory_bank_public.py (Finalized for Beta Checklist & Async Correctness)
import os
import json
import time
import asyncio
import logging
import traceback # For more detailed error logging if needed
from typing import List, Dict, Optional, Any
from json import JSONDecodeError
from datetime import datetime, timezone # For ISO timestamps
from pathlib import Path # For home directory in export
import sys # For logger setup if run standalone
logger = logging.getLogger("ZOTHEOS_MemoryBank")
if not logger.handlers:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# --- Define memory file paths and limits ---
try:
# Path logic for when memory_bank_public.py is in 'modules' folder
# and data directory is in project root ('../zotheos_public_data')
current_file_dir = os.path.dirname(os.path.abspath(__file__))
project_root_dir = os.path.dirname(current_file_dir) # This is ZOTHEOS_Release_Package
# Check if running from PyInstaller bundle
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
# If bundled, data directory is relative to sys._MEIPASS (the temp extraction folder)
DATA_BASE_DIR = os.path.join(sys._MEIPASS, "zotheos_public_data")
else:
# If running as script, data directory is relative to project root
DATA_BASE_DIR = os.path.join(project_root_dir, "zotheos_public_data")
if not os.path.exists(DATA_BASE_DIR):
os.makedirs(DATA_BASE_DIR, exist_ok=True)
logger.info(f"Created data base directory: {DATA_BASE_DIR}")
MEMORY_DIR = os.path.join(DATA_BASE_DIR, "zotheos_memory")
if not os.path.exists(MEMORY_DIR):
os.makedirs(MEMORY_DIR, exist_ok=True)
logger.info(f"Created memory directory: {MEMORY_DIR}")
except Exception as e:
logger.critical(f"❌ Failed to setup base/memory directory for MemoryBank: {e}. Using fallback in user home.")
MEMORY_DIR = os.path.join(os.path.expanduser("~"), ".zotheos", "zotheos_memory")
os.makedirs(MEMORY_DIR, exist_ok=True)
MEMORY_FILE_PATH = os.path.join(MEMORY_DIR, "zotheos_memory.json")
MEMORY_FILE_TMP = os.path.join(MEMORY_DIR, "zotheos_memory_tmp.json")
MEMORY_SIZE_LIMIT = 1000
MEMORY_SCHEMA_VERSION = 1.2 # Bumped for metadata structure in entry
class MemoryBank:
def __init__(self):
self.memory_list: List[Dict[str, Any]] = []
self.memory_dict: Dict[str, Dict[str, Any]] = {} # For ID-based lookups, ID should be string
self.next_id = 0
logger.info(f"🧠 Initializing Memory Bank. Memory file: {MEMORY_FILE_PATH}")
self._load_memory() # Load initial state
if self.memory_list:
try:
# Ensure IDs are treated as integers for max() if they are numeric strings
numeric_ids = [int(m.get('id', -1)) for m in self.memory_list if str(m.get('id', '')).isdigit()]
if numeric_ids:
max_id = max(numeric_ids)
self.next_id = max(max_id + 1, len(self.memory_list))
else: # No numeric IDs found
self.next_id = len(self.memory_list)
except ValueError: # Fallback if conversion to int fails for some reason
self.next_id = len(self.memory_list)
logger.info(f"Loaded {len(self.memory_list)} memories. Next ID set to {self.next_id}.")
else:
logger.info("Initialized with empty memory.")
def _reset_memory_state(self):
self.memory_list = []
self.memory_dict = {}
self.next_id = 0
logger.info("Memory state has been reset.")
def _load_memory(self):
logger.info(f"Attempting to load memory from {MEMORY_FILE_PATH}...")
try:
if os.path.exists(MEMORY_FILE_PATH):
with open(MEMORY_FILE_PATH, "r", encoding="utf-8") as file:
data = json.load(file)
if isinstance(data, dict) and "entries" in data and isinstance(data["entries"], list):
self.memory_list = data["entries"]
# Rebuild dictionary, ensuring IDs are strings for keys
self.memory_dict = {str(m['id']): m for m in self.memory_list if m.get('id') is not None}
logger.info(f"✅ Successfully loaded {len(self.memory_list)} memory entries (schema version: {data.get('schema_version', 'Unknown')}).")
elif isinstance(data, list):
logger.warning(f"Old memory format (list) detected. Converting and saving in new format.")
self.memory_list = data
self.memory_dict = {str(m.get('id','')): m for m in self.memory_list if m.get('id') is not None}
self._save_memory() # Save immediately in new format
else:
logger.warning(f"⚠️ Memory file {MEMORY_FILE_PATH} has an unexpected main structure. Resetting memory.")
self._reset_memory_state()
else:
logger.info(f"✅ No existing memory file found at {MEMORY_FILE_PATH}. Starting fresh.")
self._reset_memory_state()
except JSONDecodeError as e:
logger.error(f"❌ Error decoding JSON from memory file {MEMORY_FILE_PATH}: {e}. File might be corrupted. Resetting memory.", exc_info=False)
self._handle_corrupted_memory_file()
except FileNotFoundError:
logger.info(f"✅ Memory file {MEMORY_FILE_PATH} not found. Starting fresh (FileNotFound).")
self._reset_memory_state()
except Exception as e:
logger.error(f"❌ Unexpected error loading memory: {e}. Resetting memory.", exc_info=True)
self._reset_memory_state()
def _handle_corrupted_memory_file(self):
self._reset_memory_state()
if os.path.exists(MEMORY_FILE_PATH):
try:
corrupt_backup_path = f"{MEMORY_FILE_PATH}.corrupt_{datetime.now().strftime('%Y%m%d%H%M%S')}"
os.rename(MEMORY_FILE_PATH, corrupt_backup_path)
logger.info(f"Backed up corrupted memory file to {corrupt_backup_path}")
except OSError as backup_err:
logger.error(f"Failed to backup corrupted memory file: {backup_err}")
def _save_memory(self): # This is synchronous
logger.debug(f"Attempting atomic save to {MEMORY_FILE_PATH}...")
try:
data_to_save = {"schema_version": MEMORY_SCHEMA_VERSION, "entries": self.memory_list}
with open(MEMORY_FILE_TMP, "w", encoding="utf-8") as file:
json.dump(data_to_save, file, indent=2)
os.replace(MEMORY_FILE_TMP, MEMORY_FILE_PATH)
logger.info(f"✅ Memory saved successfully ({len(self.memory_list)} entries).")
except Exception as e:
logger.error(f"❌ Error saving memory: {e}", exc_info=True)
if os.path.exists(MEMORY_FILE_TMP):
try: os.remove(MEMORY_FILE_TMP)
except OSError: pass
finally:
if os.path.exists(MEMORY_FILE_TMP):
try: os.remove(MEMORY_FILE_TMP)
except OSError as e_rem: logger.warning(f"Could not remove temp memory file {MEMORY_FILE_TMP}: {e_rem}")
async def save_memory_async(self):
logger.debug("Scheduling asynchronous memory save...")
await asyncio.to_thread(self._save_memory)
async def store_memory_async(self, query: str, response: str, metadata: Optional[Dict[str, Any]] = None):
if not query or not response:
logger.warning("⚠️ Attempted to store empty query or response. Skipping.")
return
try:
current_id_num = self.next_id
current_metadata = metadata.copy() if metadata is not None else {} # Work with a copy
# Ensure a proper ISO-formatted timestamp is in metadata
current_metadata['timestamp_iso'] = datetime.now(timezone.utc).isoformat()
memory_entry = {
'id': str(current_id_num),
'query': query,
'response': response, # This is the full_fused_output from main_fusion
'created_at_unix': time.time(), # Retain for fallback sorting
'schema_version': MEMORY_SCHEMA_VERSION,
'metadata': current_metadata # This now contains timestamp_iso and other details
}
self.next_id += 1
self.memory_list.append(memory_entry)
self.memory_dict[str(current_id_num)] = memory_entry
logger.info(f"Stored memory entry ID {current_id_num}.")
removed_count = 0
while len(self.memory_list) > MEMORY_SIZE_LIMIT:
oldest_memory = self.memory_list.pop(0)
oldest_id = str(oldest_memory.get('id', ''))
if oldest_id and oldest_id in self.memory_dict: del self.memory_dict[oldest_id]
removed_count += 1
if removed_count > 0: logger.info(f"Removed {removed_count} oldest entries for size limit.")
await self.save_memory_async()
except Exception as e: logger.error(f"❌ Error storing memory entry: {e}", exc_info=True)
async def retrieve_recent_memories_async(self, limit: int = 5) -> List[Dict[str, Any]]:
logger.debug(f"Retrieving up to {limit} recent memories, sorted.")
if not self.memory_list: return []
try:
def get_sort_key(entry):
ts_iso = entry.get('metadata', {}).get('timestamp_iso')
# Fallback to created_at_unix if timestamp_iso is missing or unparsable
if ts_iso:
try: return datetime.fromisoformat(ts_iso.replace('Z', '+00:00'))
except ValueError:
logger.warning(f"Could not parse timestamp_iso '{ts_iso}' for entry ID {entry.get('id')}. Falling back to created_at_unix.")
pass # Fall through to use created_at_unix
return datetime.fromtimestamp(entry.get('created_at_unix', 0), timezone.utc)
# Make a copy for sorting to avoid modifying self.memory_list if other operations occur
sorted_entries = sorted(list(self.memory_list), key=get_sort_key, reverse=True)
actual_limit = max(0, min(limit, len(sorted_entries)))
recent_sorted_memories = sorted_entries[:actual_limit]
logger.info(f"Retrieved {len(recent_sorted_memories)} recent memories (sorted).")
return recent_sorted_memories
except Exception as e:
logger.error(f"❌ Error retrieving and sorting recent memories: {e}", exc_info=True)
# Fallback to unsorted last N if sorting fails
return self.memory_list[-limit:][::-1] if limit > 0 and self.memory_list else []
def load_all_memory_entries_structured(self) -> List[Dict[str, Any]]:
logger.info(f"Loading all {len(self.memory_list)} memory entries (structured).")
return list(self.memory_list)
async def load_all_memory_entries_structured_async(self) -> List[Dict[str, Any]]:
logger.info(f"Asynchronously loading all {len(self.memory_list)} memory entries (structured).")
return list(self.memory_list) # For now, direct copy is fine as it's in-memory
async def retrieve_memory_by_id(self, memory_id: str) -> Optional[Dict[str, Any]]:
memory_id_str = str(memory_id)
try:
memory = self.memory_dict.get(memory_id_str)
if memory: logger.info(f"Retrieved memory entry ID {memory_id_str}.")
else: logger.warning(f"Memory ID {memory_id_str} not found in dictionary.")
return memory
except Exception as e: logger.error(f"❌ Error retrieving memory ID {memory_id_str}: {e}", exc_info=True); return None
async def retrieve_last_response(self) -> Optional[str]:
if not self.memory_list: logger.info("Retrieve last response: Memory list is empty."); return None
try:
last_entry = self.memory_list[-1]; response = last_entry.get('response')
if isinstance(response, str) and response.strip(): logger.info("Retrieve last response: Found valid response."); return response
else: logger.warning(f"Retrieve last response: Last entry (ID {last_entry.get('id', 'N/A')}) has empty response."); return None
except Exception as e: logger.error(f"❌ Error retrieving last response: {e}", exc_info=True); return None
async def clear_all_memory(self):
logger.warning("Initiating complete memory wipe...")
try: self._reset_memory_state(); await self.save_memory_async(); logger.info("✅ All memory cleared successfully."); return True
except Exception as e: logger.error(f"❌ Error clearing memory: {e}", exc_info=True); return False
async def delete_memory_by_id(self, memory_id: str):
logger.warning(f"Attempting to delete memory ID {memory_id}...")
memory_id_str = str(memory_id)
try:
if memory_id_str in self.memory_dict:
del self.memory_dict[memory_id_str]
self.memory_list = [m for m in self.memory_list if str(m.get('id', '')) != memory_id_str]
await self.save_memory_async()
logger.info(f"✅ Memory with ID {memory_id_str} deleted successfully.")
return True
else:
logger.warning(f"⚠️ Memory ID {memory_id_str} not found for deletion.")
return False
except Exception as e:
logger.error(f"❌ Error deleting memory ID {memory_id_str}: {e}", exc_info=True)
return False
async def get_memory_stats(self) -> Dict[str, Any]: # This is now async
logger.info("Calculating memory statistics...")
stats: Dict[str, Any] = {'total_entries': len(self.memory_list), 'disk_usage_mb': 0.0, 'memory_limit': MEMORY_SIZE_LIMIT, 'next_id': self.next_id, 'schema_version': MEMORY_SCHEMA_VERSION }
try:
if os.path.exists(MEMORY_FILE_PATH):
file_size_bytes = await asyncio.to_thread(os.path.getsize, MEMORY_FILE_PATH)
stats['disk_usage_mb'] = round(file_size_bytes / (1024 * 1024), 3)
logger.info(f"Memory Stats: {stats}")
except Exception as e: logger.error(f"❌ Error calculating memory file size: {e}", exc_info=True)
return stats
def export_memory_to_file_sync(self) -> Optional[str]:
"""Synchronously exports memory file. Returns exported file path or None."""
# NOTE (Future Personalization): Consider allowing user to choose export location via UI dialog.
if not os.path.exists(MEMORY_FILE_PATH):
logger.warning("No memory file to export because it doesn't exist.")
return None
if not self.memory_list: # Also check if there are any entries to export
logger.warning("No memory entries to export, memory file might be empty or just schema.")
# Decide if you want to export an empty "entries" file or return None
# For now, let's allow exporting an empty structure.
# return None
try:
export_dir = Path.home() / "Desktop"
if not (export_dir.exists() and export_dir.is_dir()):
export_dir = Path.home() / "Downloads"
if not (export_dir.exists() and export_dir.is_dir()):
export_dir = Path(MEMORY_DIR) # Fallback
logger.warning(f"Desktop/Downloads not found/accessible, exporting to memory directory: {export_dir}")
os.makedirs(export_dir, exist_ok=True) # Ensure export_dir exists
timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S')
export_filename = f"zotheos_memory_export_{timestamp_str}.json"
export_full_path = export_dir / export_filename
import shutil
shutil.copy2(MEMORY_FILE_PATH, export_full_path)
logger.info(f"✅ Memory successfully exported to: {export_full_path}")
return str(export_full_path)
except Exception as e:
logger.error(f"❌ Failed to export memory: {e}", exc_info=True)
return None
async def export_memory_to_file_async(self) -> Optional[str]:
"""Asynchronously exports the memory file."""
logger.info("Scheduling asynchronous memory export...")
return await asyncio.to_thread(self.export_memory_to_file_sync)
async def main_test():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
logger.info("--- MemoryBank Test ---")
mb = MemoryBank()
logger.info(f"Initial memory file path: {MEMORY_FILE_PATH}")
if os.path.exists(MEMORY_FILE_PATH):
logger.info("Memory file exists. Clearing for fresh test.")
await mb.clear_all_memory()
else:
logger.info("No pre-existing memory file found for test.")
stats1 = await mb.get_memory_stats()
logger.info(f"Stats after potential clear: {stats1}")
# Test Store
await mb.store_memory_async("Query 1", "Response 1", metadata={"custom_field": "value1", "tier_at_interaction": "free"})
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 2", "Response 2", metadata={"user_token_used_prefix": "tes***", "synthesized_summary_text": "Summary for Q2"})
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 3", "Response 3", metadata={"tier_at_interaction": "pro", "synthesized_summary_text": "Summary for Q3"})
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 4", "Response 4", metadata={}) # No extra metadata
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 5", "Response 5", metadata={"synthesized_summary_text": "This is summary 5."})
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 6", "Response 6", metadata={"synthesized_summary_text": "This is summary 6, a bit longer than the preview."})
recent_for_display = await mb.retrieve_recent_memories_async(limit=5)
logger.info(f"Recent 5 (for display, should be newest first - Q6, Q5, Q4, Q3, Q2):")
for i, item in enumerate(recent_for_display):
ts = item.get('metadata',{}).get('timestamp_iso', item.get('created_at_unix'))
logger.info(f" {i+1}. ID: {item.get('id')}, Timestamp: {ts}, Query: {item.get('query')[:20]}..., Summary in meta: {'synthesized_summary_text' in item.get('metadata', {})}")
stats2 = await mb.get_memory_stats()
logger.info(f"Memory Stats after storing: {stats2}")
exported_file = await mb.export_memory_to_file_async()
if exported_file: logger.info(f"Test export successful: {exported_file}")
else: logger.error("Test export failed.")
logger.info("--- MemoryBank Test Complete ---")
if __name__ == "__main__":
asyncio.run(main_test())