Spaces:
Running
Running
# FILE: modules/memory_bank_public.py (Finalized for Beta Checklist & Async Correctness) | |
import os | |
import json | |
import time | |
import asyncio | |
import logging | |
import traceback # For more detailed error logging if needed | |
from typing import List, Dict, Optional, Any | |
from json import JSONDecodeError | |
from datetime import datetime, timezone # For ISO timestamps | |
from pathlib import Path # For home directory in export | |
import sys # For logger setup if run standalone | |
logger = logging.getLogger("ZOTHEOS_MemoryBank") | |
if not logger.handlers: | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
handler = logging.StreamHandler(sys.stdout) | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.setLevel(logging.INFO) | |
# --- Define memory file paths and limits --- | |
try: | |
# Path logic for when memory_bank_public.py is in 'modules' folder | |
# and data directory is in project root ('../zotheos_public_data') | |
current_file_dir = os.path.dirname(os.path.abspath(__file__)) | |
project_root_dir = os.path.dirname(current_file_dir) # This is ZOTHEOS_Release_Package | |
# Check if running from PyInstaller bundle | |
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): | |
# If bundled, data directory is relative to sys._MEIPASS (the temp extraction folder) | |
DATA_BASE_DIR = os.path.join(sys._MEIPASS, "zotheos_public_data") | |
else: | |
# If running as script, data directory is relative to project root | |
DATA_BASE_DIR = os.path.join(project_root_dir, "zotheos_public_data") | |
if not os.path.exists(DATA_BASE_DIR): | |
os.makedirs(DATA_BASE_DIR, exist_ok=True) | |
logger.info(f"Created data base directory: {DATA_BASE_DIR}") | |
MEMORY_DIR = os.path.join(DATA_BASE_DIR, "zotheos_memory") | |
if not os.path.exists(MEMORY_DIR): | |
os.makedirs(MEMORY_DIR, exist_ok=True) | |
logger.info(f"Created memory directory: {MEMORY_DIR}") | |
except Exception as e: | |
logger.critical(f"❌ Failed to setup base/memory directory for MemoryBank: {e}. Using fallback in user home.") | |
MEMORY_DIR = os.path.join(os.path.expanduser("~"), ".zotheos", "zotheos_memory") | |
os.makedirs(MEMORY_DIR, exist_ok=True) | |
MEMORY_FILE_PATH = os.path.join(MEMORY_DIR, "zotheos_memory.json") | |
MEMORY_FILE_TMP = os.path.join(MEMORY_DIR, "zotheos_memory_tmp.json") | |
MEMORY_SIZE_LIMIT = 1000 | |
MEMORY_SCHEMA_VERSION = 1.2 # Bumped for metadata structure in entry | |
class MemoryBank: | |
def __init__(self): | |
self.memory_list: List[Dict[str, Any]] = [] | |
self.memory_dict: Dict[str, Dict[str, Any]] = {} # For ID-based lookups, ID should be string | |
self.next_id = 0 | |
logger.info(f"🧠 Initializing Memory Bank. Memory file: {MEMORY_FILE_PATH}") | |
self._load_memory() # Load initial state | |
if self.memory_list: | |
try: | |
# Ensure IDs are treated as integers for max() if they are numeric strings | |
numeric_ids = [int(m.get('id', -1)) for m in self.memory_list if str(m.get('id', '')).isdigit()] | |
if numeric_ids: | |
max_id = max(numeric_ids) | |
self.next_id = max(max_id + 1, len(self.memory_list)) | |
else: # No numeric IDs found | |
self.next_id = len(self.memory_list) | |
except ValueError: # Fallback if conversion to int fails for some reason | |
self.next_id = len(self.memory_list) | |
logger.info(f"Loaded {len(self.memory_list)} memories. Next ID set to {self.next_id}.") | |
else: | |
logger.info("Initialized with empty memory.") | |
def _reset_memory_state(self): | |
self.memory_list = [] | |
self.memory_dict = {} | |
self.next_id = 0 | |
logger.info("Memory state has been reset.") | |
def _load_memory(self): | |
logger.info(f"Attempting to load memory from {MEMORY_FILE_PATH}...") | |
try: | |
if os.path.exists(MEMORY_FILE_PATH): | |
with open(MEMORY_FILE_PATH, "r", encoding="utf-8") as file: | |
data = json.load(file) | |
if isinstance(data, dict) and "entries" in data and isinstance(data["entries"], list): | |
self.memory_list = data["entries"] | |
# Rebuild dictionary, ensuring IDs are strings for keys | |
self.memory_dict = {str(m['id']): m for m in self.memory_list if m.get('id') is not None} | |
logger.info(f"✅ Successfully loaded {len(self.memory_list)} memory entries (schema version: {data.get('schema_version', 'Unknown')}).") | |
elif isinstance(data, list): | |
logger.warning(f"Old memory format (list) detected. Converting and saving in new format.") | |
self.memory_list = data | |
self.memory_dict = {str(m.get('id','')): m for m in self.memory_list if m.get('id') is not None} | |
self._save_memory() # Save immediately in new format | |
else: | |
logger.warning(f"⚠️ Memory file {MEMORY_FILE_PATH} has an unexpected main structure. Resetting memory.") | |
self._reset_memory_state() | |
else: | |
logger.info(f"✅ No existing memory file found at {MEMORY_FILE_PATH}. Starting fresh.") | |
self._reset_memory_state() | |
except JSONDecodeError as e: | |
logger.error(f"❌ Error decoding JSON from memory file {MEMORY_FILE_PATH}: {e}. File might be corrupted. Resetting memory.", exc_info=False) | |
self._handle_corrupted_memory_file() | |
except FileNotFoundError: | |
logger.info(f"✅ Memory file {MEMORY_FILE_PATH} not found. Starting fresh (FileNotFound).") | |
self._reset_memory_state() | |
except Exception as e: | |
logger.error(f"❌ Unexpected error loading memory: {e}. Resetting memory.", exc_info=True) | |
self._reset_memory_state() | |
def _handle_corrupted_memory_file(self): | |
self._reset_memory_state() | |
if os.path.exists(MEMORY_FILE_PATH): | |
try: | |
corrupt_backup_path = f"{MEMORY_FILE_PATH}.corrupt_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
os.rename(MEMORY_FILE_PATH, corrupt_backup_path) | |
logger.info(f"Backed up corrupted memory file to {corrupt_backup_path}") | |
except OSError as backup_err: | |
logger.error(f"Failed to backup corrupted memory file: {backup_err}") | |
def _save_memory(self): # This is synchronous | |
logger.debug(f"Attempting atomic save to {MEMORY_FILE_PATH}...") | |
try: | |
data_to_save = {"schema_version": MEMORY_SCHEMA_VERSION, "entries": self.memory_list} | |
with open(MEMORY_FILE_TMP, "w", encoding="utf-8") as file: | |
json.dump(data_to_save, file, indent=2) | |
os.replace(MEMORY_FILE_TMP, MEMORY_FILE_PATH) | |
logger.info(f"✅ Memory saved successfully ({len(self.memory_list)} entries).") | |
except Exception as e: | |
logger.error(f"❌ Error saving memory: {e}", exc_info=True) | |
if os.path.exists(MEMORY_FILE_TMP): | |
try: os.remove(MEMORY_FILE_TMP) | |
except OSError: pass | |
finally: | |
if os.path.exists(MEMORY_FILE_TMP): | |
try: os.remove(MEMORY_FILE_TMP) | |
except OSError as e_rem: logger.warning(f"Could not remove temp memory file {MEMORY_FILE_TMP}: {e_rem}") | |
async def save_memory_async(self): | |
logger.debug("Scheduling asynchronous memory save...") | |
await asyncio.to_thread(self._save_memory) | |
async def store_memory_async(self, query: str, response: str, metadata: Optional[Dict[str, Any]] = None): | |
if not query or not response: | |
logger.warning("⚠️ Attempted to store empty query or response. Skipping.") | |
return | |
try: | |
current_id_num = self.next_id | |
current_metadata = metadata.copy() if metadata is not None else {} # Work with a copy | |
# Ensure a proper ISO-formatted timestamp is in metadata | |
current_metadata['timestamp_iso'] = datetime.now(timezone.utc).isoformat() | |
memory_entry = { | |
'id': str(current_id_num), | |
'query': query, | |
'response': response, # This is the full_fused_output from main_fusion | |
'created_at_unix': time.time(), # Retain for fallback sorting | |
'schema_version': MEMORY_SCHEMA_VERSION, | |
'metadata': current_metadata # This now contains timestamp_iso and other details | |
} | |
self.next_id += 1 | |
self.memory_list.append(memory_entry) | |
self.memory_dict[str(current_id_num)] = memory_entry | |
logger.info(f"Stored memory entry ID {current_id_num}.") | |
removed_count = 0 | |
while len(self.memory_list) > MEMORY_SIZE_LIMIT: | |
oldest_memory = self.memory_list.pop(0) | |
oldest_id = str(oldest_memory.get('id', '')) | |
if oldest_id and oldest_id in self.memory_dict: del self.memory_dict[oldest_id] | |
removed_count += 1 | |
if removed_count > 0: logger.info(f"Removed {removed_count} oldest entries for size limit.") | |
await self.save_memory_async() | |
except Exception as e: logger.error(f"❌ Error storing memory entry: {e}", exc_info=True) | |
async def retrieve_recent_memories_async(self, limit: int = 5) -> List[Dict[str, Any]]: | |
logger.debug(f"Retrieving up to {limit} recent memories, sorted.") | |
if not self.memory_list: return [] | |
try: | |
def get_sort_key(entry): | |
ts_iso = entry.get('metadata', {}).get('timestamp_iso') | |
# Fallback to created_at_unix if timestamp_iso is missing or unparsable | |
if ts_iso: | |
try: return datetime.fromisoformat(ts_iso.replace('Z', '+00:00')) | |
except ValueError: | |
logger.warning(f"Could not parse timestamp_iso '{ts_iso}' for entry ID {entry.get('id')}. Falling back to created_at_unix.") | |
pass # Fall through to use created_at_unix | |
return datetime.fromtimestamp(entry.get('created_at_unix', 0), timezone.utc) | |
# Make a copy for sorting to avoid modifying self.memory_list if other operations occur | |
sorted_entries = sorted(list(self.memory_list), key=get_sort_key, reverse=True) | |
actual_limit = max(0, min(limit, len(sorted_entries))) | |
recent_sorted_memories = sorted_entries[:actual_limit] | |
logger.info(f"Retrieved {len(recent_sorted_memories)} recent memories (sorted).") | |
return recent_sorted_memories | |
except Exception as e: | |
logger.error(f"❌ Error retrieving and sorting recent memories: {e}", exc_info=True) | |
# Fallback to unsorted last N if sorting fails | |
return self.memory_list[-limit:][::-1] if limit > 0 and self.memory_list else [] | |
def load_all_memory_entries_structured(self) -> List[Dict[str, Any]]: | |
logger.info(f"Loading all {len(self.memory_list)} memory entries (structured).") | |
return list(self.memory_list) | |
async def load_all_memory_entries_structured_async(self) -> List[Dict[str, Any]]: | |
logger.info(f"Asynchronously loading all {len(self.memory_list)} memory entries (structured).") | |
return list(self.memory_list) # For now, direct copy is fine as it's in-memory | |
async def retrieve_memory_by_id(self, memory_id: str) -> Optional[Dict[str, Any]]: | |
memory_id_str = str(memory_id) | |
try: | |
memory = self.memory_dict.get(memory_id_str) | |
if memory: logger.info(f"Retrieved memory entry ID {memory_id_str}.") | |
else: logger.warning(f"Memory ID {memory_id_str} not found in dictionary.") | |
return memory | |
except Exception as e: logger.error(f"❌ Error retrieving memory ID {memory_id_str}: {e}", exc_info=True); return None | |
async def retrieve_last_response(self) -> Optional[str]: | |
if not self.memory_list: logger.info("Retrieve last response: Memory list is empty."); return None | |
try: | |
last_entry = self.memory_list[-1]; response = last_entry.get('response') | |
if isinstance(response, str) and response.strip(): logger.info("Retrieve last response: Found valid response."); return response | |
else: logger.warning(f"Retrieve last response: Last entry (ID {last_entry.get('id', 'N/A')}) has empty response."); return None | |
except Exception as e: logger.error(f"❌ Error retrieving last response: {e}", exc_info=True); return None | |
async def clear_all_memory(self): | |
logger.warning("Initiating complete memory wipe...") | |
try: self._reset_memory_state(); await self.save_memory_async(); logger.info("✅ All memory cleared successfully."); return True | |
except Exception as e: logger.error(f"❌ Error clearing memory: {e}", exc_info=True); return False | |
async def delete_memory_by_id(self, memory_id: str): | |
logger.warning(f"Attempting to delete memory ID {memory_id}...") | |
memory_id_str = str(memory_id) | |
try: | |
if memory_id_str in self.memory_dict: | |
del self.memory_dict[memory_id_str] | |
self.memory_list = [m for m in self.memory_list if str(m.get('id', '')) != memory_id_str] | |
await self.save_memory_async() | |
logger.info(f"✅ Memory with ID {memory_id_str} deleted successfully.") | |
return True | |
else: | |
logger.warning(f"⚠️ Memory ID {memory_id_str} not found for deletion.") | |
return False | |
except Exception as e: | |
logger.error(f"❌ Error deleting memory ID {memory_id_str}: {e}", exc_info=True) | |
return False | |
async def get_memory_stats(self) -> Dict[str, Any]: # This is now async | |
logger.info("Calculating memory statistics...") | |
stats: Dict[str, Any] = {'total_entries': len(self.memory_list), 'disk_usage_mb': 0.0, 'memory_limit': MEMORY_SIZE_LIMIT, 'next_id': self.next_id, 'schema_version': MEMORY_SCHEMA_VERSION } | |
try: | |
if os.path.exists(MEMORY_FILE_PATH): | |
file_size_bytes = await asyncio.to_thread(os.path.getsize, MEMORY_FILE_PATH) | |
stats['disk_usage_mb'] = round(file_size_bytes / (1024 * 1024), 3) | |
logger.info(f"Memory Stats: {stats}") | |
except Exception as e: logger.error(f"❌ Error calculating memory file size: {e}", exc_info=True) | |
return stats | |
def export_memory_to_file_sync(self) -> Optional[str]: | |
"""Synchronously exports memory file. Returns exported file path or None.""" | |
# NOTE (Future Personalization): Consider allowing user to choose export location via UI dialog. | |
if not os.path.exists(MEMORY_FILE_PATH): | |
logger.warning("No memory file to export because it doesn't exist.") | |
return None | |
if not self.memory_list: # Also check if there are any entries to export | |
logger.warning("No memory entries to export, memory file might be empty or just schema.") | |
# Decide if you want to export an empty "entries" file or return None | |
# For now, let's allow exporting an empty structure. | |
# return None | |
try: | |
export_dir = Path.home() / "Desktop" | |
if not (export_dir.exists() and export_dir.is_dir()): | |
export_dir = Path.home() / "Downloads" | |
if not (export_dir.exists() and export_dir.is_dir()): | |
export_dir = Path(MEMORY_DIR) # Fallback | |
logger.warning(f"Desktop/Downloads not found/accessible, exporting to memory directory: {export_dir}") | |
os.makedirs(export_dir, exist_ok=True) # Ensure export_dir exists | |
timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S') | |
export_filename = f"zotheos_memory_export_{timestamp_str}.json" | |
export_full_path = export_dir / export_filename | |
import shutil | |
shutil.copy2(MEMORY_FILE_PATH, export_full_path) | |
logger.info(f"✅ Memory successfully exported to: {export_full_path}") | |
return str(export_full_path) | |
except Exception as e: | |
logger.error(f"❌ Failed to export memory: {e}", exc_info=True) | |
return None | |
async def export_memory_to_file_async(self) -> Optional[str]: | |
"""Asynchronously exports the memory file.""" | |
logger.info("Scheduling asynchronous memory export...") | |
return await asyncio.to_thread(self.export_memory_to_file_sync) | |
async def main_test(): | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s') | |
logger.info("--- MemoryBank Test ---") | |
mb = MemoryBank() | |
logger.info(f"Initial memory file path: {MEMORY_FILE_PATH}") | |
if os.path.exists(MEMORY_FILE_PATH): | |
logger.info("Memory file exists. Clearing for fresh test.") | |
await mb.clear_all_memory() | |
else: | |
logger.info("No pre-existing memory file found for test.") | |
stats1 = await mb.get_memory_stats() | |
logger.info(f"Stats after potential clear: {stats1}") | |
# Test Store | |
await mb.store_memory_async("Query 1", "Response 1", metadata={"custom_field": "value1", "tier_at_interaction": "free"}) | |
await asyncio.sleep(0.01) | |
await mb.store_memory_async("Query 2", "Response 2", metadata={"user_token_used_prefix": "tes***", "synthesized_summary_text": "Summary for Q2"}) | |
await asyncio.sleep(0.01) | |
await mb.store_memory_async("Query 3", "Response 3", metadata={"tier_at_interaction": "pro", "synthesized_summary_text": "Summary for Q3"}) | |
await asyncio.sleep(0.01) | |
await mb.store_memory_async("Query 4", "Response 4", metadata={}) # No extra metadata | |
await asyncio.sleep(0.01) | |
await mb.store_memory_async("Query 5", "Response 5", metadata={"synthesized_summary_text": "This is summary 5."}) | |
await asyncio.sleep(0.01) | |
await mb.store_memory_async("Query 6", "Response 6", metadata={"synthesized_summary_text": "This is summary 6, a bit longer than the preview."}) | |
recent_for_display = await mb.retrieve_recent_memories_async(limit=5) | |
logger.info(f"Recent 5 (for display, should be newest first - Q6, Q5, Q4, Q3, Q2):") | |
for i, item in enumerate(recent_for_display): | |
ts = item.get('metadata',{}).get('timestamp_iso', item.get('created_at_unix')) | |
logger.info(f" {i+1}. ID: {item.get('id')}, Timestamp: {ts}, Query: {item.get('query')[:20]}..., Summary in meta: {'synthesized_summary_text' in item.get('metadata', {})}") | |
stats2 = await mb.get_memory_stats() | |
logger.info(f"Memory Stats after storing: {stats2}") | |
exported_file = await mb.export_memory_to_file_async() | |
if exported_file: logger.info(f"Test export successful: {exported_file}") | |
else: logger.error("Test export failed.") | |
logger.info("--- MemoryBank Test Complete ---") | |
if __name__ == "__main__": | |
asyncio.run(main_test()) |