Spaces:
Running
Running
File size: 19,757 Bytes
4d1849f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# FILE: modules/memory_bank_public.py (Finalized for Beta Checklist & Async Correctness)
import os
import json
import time
import asyncio
import logging
import traceback # For more detailed error logging if needed
from typing import List, Dict, Optional, Any
from json import JSONDecodeError
from datetime import datetime, timezone # For ISO timestamps
from pathlib import Path # For home directory in export
import sys # For logger setup if run standalone
logger = logging.getLogger("ZOTHEOS_MemoryBank")
if not logger.handlers:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# --- Define memory file paths and limits ---
try:
# Path logic for when memory_bank_public.py is in 'modules' folder
# and data directory is in project root ('../zotheos_public_data')
current_file_dir = os.path.dirname(os.path.abspath(__file__))
project_root_dir = os.path.dirname(current_file_dir) # This is ZOTHEOS_Release_Package
# Check if running from PyInstaller bundle
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
# If bundled, data directory is relative to sys._MEIPASS (the temp extraction folder)
DATA_BASE_DIR = os.path.join(sys._MEIPASS, "zotheos_public_data")
else:
# If running as script, data directory is relative to project root
DATA_BASE_DIR = os.path.join(project_root_dir, "zotheos_public_data")
if not os.path.exists(DATA_BASE_DIR):
os.makedirs(DATA_BASE_DIR, exist_ok=True)
logger.info(f"Created data base directory: {DATA_BASE_DIR}")
MEMORY_DIR = os.path.join(DATA_BASE_DIR, "zotheos_memory")
if not os.path.exists(MEMORY_DIR):
os.makedirs(MEMORY_DIR, exist_ok=True)
logger.info(f"Created memory directory: {MEMORY_DIR}")
except Exception as e:
logger.critical(f"β Failed to setup base/memory directory for MemoryBank: {e}. Using fallback in user home.")
MEMORY_DIR = os.path.join(os.path.expanduser("~"), ".zotheos", "zotheos_memory")
os.makedirs(MEMORY_DIR, exist_ok=True)
MEMORY_FILE_PATH = os.path.join(MEMORY_DIR, "zotheos_memory.json")
MEMORY_FILE_TMP = os.path.join(MEMORY_DIR, "zotheos_memory_tmp.json")
MEMORY_SIZE_LIMIT = 1000
MEMORY_SCHEMA_VERSION = 1.2 # Bumped for metadata structure in entry
class MemoryBank:
def __init__(self):
self.memory_list: List[Dict[str, Any]] = []
self.memory_dict: Dict[str, Dict[str, Any]] = {} # For ID-based lookups, ID should be string
self.next_id = 0
logger.info(f"π§ Initializing Memory Bank. Memory file: {MEMORY_FILE_PATH}")
self._load_memory() # Load initial state
if self.memory_list:
try:
# Ensure IDs are treated as integers for max() if they are numeric strings
numeric_ids = [int(m.get('id', -1)) for m in self.memory_list if str(m.get('id', '')).isdigit()]
if numeric_ids:
max_id = max(numeric_ids)
self.next_id = max(max_id + 1, len(self.memory_list))
else: # No numeric IDs found
self.next_id = len(self.memory_list)
except ValueError: # Fallback if conversion to int fails for some reason
self.next_id = len(self.memory_list)
logger.info(f"Loaded {len(self.memory_list)} memories. Next ID set to {self.next_id}.")
else:
logger.info("Initialized with empty memory.")
def _reset_memory_state(self):
self.memory_list = []
self.memory_dict = {}
self.next_id = 0
logger.info("Memory state has been reset.")
def _load_memory(self):
logger.info(f"Attempting to load memory from {MEMORY_FILE_PATH}...")
try:
if os.path.exists(MEMORY_FILE_PATH):
with open(MEMORY_FILE_PATH, "r", encoding="utf-8") as file:
data = json.load(file)
if isinstance(data, dict) and "entries" in data and isinstance(data["entries"], list):
self.memory_list = data["entries"]
# Rebuild dictionary, ensuring IDs are strings for keys
self.memory_dict = {str(m['id']): m for m in self.memory_list if m.get('id') is not None}
logger.info(f"β
Successfully loaded {len(self.memory_list)} memory entries (schema version: {data.get('schema_version', 'Unknown')}).")
elif isinstance(data, list):
logger.warning(f"Old memory format (list) detected. Converting and saving in new format.")
self.memory_list = data
self.memory_dict = {str(m.get('id','')): m for m in self.memory_list if m.get('id') is not None}
self._save_memory() # Save immediately in new format
else:
logger.warning(f"β οΈ Memory file {MEMORY_FILE_PATH} has an unexpected main structure. Resetting memory.")
self._reset_memory_state()
else:
logger.info(f"β
No existing memory file found at {MEMORY_FILE_PATH}. Starting fresh.")
self._reset_memory_state()
except JSONDecodeError as e:
logger.error(f"β Error decoding JSON from memory file {MEMORY_FILE_PATH}: {e}. File might be corrupted. Resetting memory.", exc_info=False)
self._handle_corrupted_memory_file()
except FileNotFoundError:
logger.info(f"β
Memory file {MEMORY_FILE_PATH} not found. Starting fresh (FileNotFound).")
self._reset_memory_state()
except Exception as e:
logger.error(f"β Unexpected error loading memory: {e}. Resetting memory.", exc_info=True)
self._reset_memory_state()
def _handle_corrupted_memory_file(self):
self._reset_memory_state()
if os.path.exists(MEMORY_FILE_PATH):
try:
corrupt_backup_path = f"{MEMORY_FILE_PATH}.corrupt_{datetime.now().strftime('%Y%m%d%H%M%S')}"
os.rename(MEMORY_FILE_PATH, corrupt_backup_path)
logger.info(f"Backed up corrupted memory file to {corrupt_backup_path}")
except OSError as backup_err:
logger.error(f"Failed to backup corrupted memory file: {backup_err}")
def _save_memory(self): # This is synchronous
logger.debug(f"Attempting atomic save to {MEMORY_FILE_PATH}...")
try:
data_to_save = {"schema_version": MEMORY_SCHEMA_VERSION, "entries": self.memory_list}
with open(MEMORY_FILE_TMP, "w", encoding="utf-8") as file:
json.dump(data_to_save, file, indent=2)
os.replace(MEMORY_FILE_TMP, MEMORY_FILE_PATH)
logger.info(f"β
Memory saved successfully ({len(self.memory_list)} entries).")
except Exception as e:
logger.error(f"β Error saving memory: {e}", exc_info=True)
if os.path.exists(MEMORY_FILE_TMP):
try: os.remove(MEMORY_FILE_TMP)
except OSError: pass
finally:
if os.path.exists(MEMORY_FILE_TMP):
try: os.remove(MEMORY_FILE_TMP)
except OSError as e_rem: logger.warning(f"Could not remove temp memory file {MEMORY_FILE_TMP}: {e_rem}")
async def save_memory_async(self):
logger.debug("Scheduling asynchronous memory save...")
await asyncio.to_thread(self._save_memory)
async def store_memory_async(self, query: str, response: str, metadata: Optional[Dict[str, Any]] = None):
if not query or not response:
logger.warning("β οΈ Attempted to store empty query or response. Skipping.")
return
try:
current_id_num = self.next_id
current_metadata = metadata.copy() if metadata is not None else {} # Work with a copy
# Ensure a proper ISO-formatted timestamp is in metadata
current_metadata['timestamp_iso'] = datetime.now(timezone.utc).isoformat()
memory_entry = {
'id': str(current_id_num),
'query': query,
'response': response, # This is the full_fused_output from main_fusion
'created_at_unix': time.time(), # Retain for fallback sorting
'schema_version': MEMORY_SCHEMA_VERSION,
'metadata': current_metadata # This now contains timestamp_iso and other details
}
self.next_id += 1
self.memory_list.append(memory_entry)
self.memory_dict[str(current_id_num)] = memory_entry
logger.info(f"Stored memory entry ID {current_id_num}.")
removed_count = 0
while len(self.memory_list) > MEMORY_SIZE_LIMIT:
oldest_memory = self.memory_list.pop(0)
oldest_id = str(oldest_memory.get('id', ''))
if oldest_id and oldest_id in self.memory_dict: del self.memory_dict[oldest_id]
removed_count += 1
if removed_count > 0: logger.info(f"Removed {removed_count} oldest entries for size limit.")
await self.save_memory_async()
except Exception as e: logger.error(f"β Error storing memory entry: {e}", exc_info=True)
async def retrieve_recent_memories_async(self, limit: int = 5) -> List[Dict[str, Any]]:
logger.debug(f"Retrieving up to {limit} recent memories, sorted.")
if not self.memory_list: return []
try:
def get_sort_key(entry):
ts_iso = entry.get('metadata', {}).get('timestamp_iso')
# Fallback to created_at_unix if timestamp_iso is missing or unparsable
if ts_iso:
try: return datetime.fromisoformat(ts_iso.replace('Z', '+00:00'))
except ValueError:
logger.warning(f"Could not parse timestamp_iso '{ts_iso}' for entry ID {entry.get('id')}. Falling back to created_at_unix.")
pass # Fall through to use created_at_unix
return datetime.fromtimestamp(entry.get('created_at_unix', 0), timezone.utc)
# Make a copy for sorting to avoid modifying self.memory_list if other operations occur
sorted_entries = sorted(list(self.memory_list), key=get_sort_key, reverse=True)
actual_limit = max(0, min(limit, len(sorted_entries)))
recent_sorted_memories = sorted_entries[:actual_limit]
logger.info(f"Retrieved {len(recent_sorted_memories)} recent memories (sorted).")
return recent_sorted_memories
except Exception as e:
logger.error(f"β Error retrieving and sorting recent memories: {e}", exc_info=True)
# Fallback to unsorted last N if sorting fails
return self.memory_list[-limit:][::-1] if limit > 0 and self.memory_list else []
def load_all_memory_entries_structured(self) -> List[Dict[str, Any]]:
logger.info(f"Loading all {len(self.memory_list)} memory entries (structured).")
return list(self.memory_list)
async def load_all_memory_entries_structured_async(self) -> List[Dict[str, Any]]:
logger.info(f"Asynchronously loading all {len(self.memory_list)} memory entries (structured).")
return list(self.memory_list) # For now, direct copy is fine as it's in-memory
async def retrieve_memory_by_id(self, memory_id: str) -> Optional[Dict[str, Any]]:
memory_id_str = str(memory_id)
try:
memory = self.memory_dict.get(memory_id_str)
if memory: logger.info(f"Retrieved memory entry ID {memory_id_str}.")
else: logger.warning(f"Memory ID {memory_id_str} not found in dictionary.")
return memory
except Exception as e: logger.error(f"β Error retrieving memory ID {memory_id_str}: {e}", exc_info=True); return None
async def retrieve_last_response(self) -> Optional[str]:
if not self.memory_list: logger.info("Retrieve last response: Memory list is empty."); return None
try:
last_entry = self.memory_list[-1]; response = last_entry.get('response')
if isinstance(response, str) and response.strip(): logger.info("Retrieve last response: Found valid response."); return response
else: logger.warning(f"Retrieve last response: Last entry (ID {last_entry.get('id', 'N/A')}) has empty response."); return None
except Exception as e: logger.error(f"β Error retrieving last response: {e}", exc_info=True); return None
async def clear_all_memory(self):
logger.warning("Initiating complete memory wipe...")
try: self._reset_memory_state(); await self.save_memory_async(); logger.info("β
All memory cleared successfully."); return True
except Exception as e: logger.error(f"β Error clearing memory: {e}", exc_info=True); return False
async def delete_memory_by_id(self, memory_id: str):
logger.warning(f"Attempting to delete memory ID {memory_id}...")
memory_id_str = str(memory_id)
try:
if memory_id_str in self.memory_dict:
del self.memory_dict[memory_id_str]
self.memory_list = [m for m in self.memory_list if str(m.get('id', '')) != memory_id_str]
await self.save_memory_async()
logger.info(f"β
Memory with ID {memory_id_str} deleted successfully.")
return True
else:
logger.warning(f"β οΈ Memory ID {memory_id_str} not found for deletion.")
return False
except Exception as e:
logger.error(f"β Error deleting memory ID {memory_id_str}: {e}", exc_info=True)
return False
async def get_memory_stats(self) -> Dict[str, Any]: # This is now async
logger.info("Calculating memory statistics...")
stats: Dict[str, Any] = {'total_entries': len(self.memory_list), 'disk_usage_mb': 0.0, 'memory_limit': MEMORY_SIZE_LIMIT, 'next_id': self.next_id, 'schema_version': MEMORY_SCHEMA_VERSION }
try:
if os.path.exists(MEMORY_FILE_PATH):
file_size_bytes = await asyncio.to_thread(os.path.getsize, MEMORY_FILE_PATH)
stats['disk_usage_mb'] = round(file_size_bytes / (1024 * 1024), 3)
logger.info(f"Memory Stats: {stats}")
except Exception as e: logger.error(f"β Error calculating memory file size: {e}", exc_info=True)
return stats
def export_memory_to_file_sync(self) -> Optional[str]:
"""Synchronously exports memory file. Returns exported file path or None."""
# NOTE (Future Personalization): Consider allowing user to choose export location via UI dialog.
if not os.path.exists(MEMORY_FILE_PATH):
logger.warning("No memory file to export because it doesn't exist.")
return None
if not self.memory_list: # Also check if there are any entries to export
logger.warning("No memory entries to export, memory file might be empty or just schema.")
# Decide if you want to export an empty "entries" file or return None
# For now, let's allow exporting an empty structure.
# return None
try:
export_dir = Path.home() / "Desktop"
if not (export_dir.exists() and export_dir.is_dir()):
export_dir = Path.home() / "Downloads"
if not (export_dir.exists() and export_dir.is_dir()):
export_dir = Path(MEMORY_DIR) # Fallback
logger.warning(f"Desktop/Downloads not found/accessible, exporting to memory directory: {export_dir}")
os.makedirs(export_dir, exist_ok=True) # Ensure export_dir exists
timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S')
export_filename = f"zotheos_memory_export_{timestamp_str}.json"
export_full_path = export_dir / export_filename
import shutil
shutil.copy2(MEMORY_FILE_PATH, export_full_path)
logger.info(f"β
Memory successfully exported to: {export_full_path}")
return str(export_full_path)
except Exception as e:
logger.error(f"β Failed to export memory: {e}", exc_info=True)
return None
async def export_memory_to_file_async(self) -> Optional[str]:
"""Asynchronously exports the memory file."""
logger.info("Scheduling asynchronous memory export...")
return await asyncio.to_thread(self.export_memory_to_file_sync)
async def main_test():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
logger.info("--- MemoryBank Test ---")
mb = MemoryBank()
logger.info(f"Initial memory file path: {MEMORY_FILE_PATH}")
if os.path.exists(MEMORY_FILE_PATH):
logger.info("Memory file exists. Clearing for fresh test.")
await mb.clear_all_memory()
else:
logger.info("No pre-existing memory file found for test.")
stats1 = await mb.get_memory_stats()
logger.info(f"Stats after potential clear: {stats1}")
# Test Store
await mb.store_memory_async("Query 1", "Response 1", metadata={"custom_field": "value1", "tier_at_interaction": "free"})
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 2", "Response 2", metadata={"user_token_used_prefix": "tes***", "synthesized_summary_text": "Summary for Q2"})
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 3", "Response 3", metadata={"tier_at_interaction": "pro", "synthesized_summary_text": "Summary for Q3"})
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 4", "Response 4", metadata={}) # No extra metadata
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 5", "Response 5", metadata={"synthesized_summary_text": "This is summary 5."})
await asyncio.sleep(0.01)
await mb.store_memory_async("Query 6", "Response 6", metadata={"synthesized_summary_text": "This is summary 6, a bit longer than the preview."})
recent_for_display = await mb.retrieve_recent_memories_async(limit=5)
logger.info(f"Recent 5 (for display, should be newest first - Q6, Q5, Q4, Q3, Q2):")
for i, item in enumerate(recent_for_display):
ts = item.get('metadata',{}).get('timestamp_iso', item.get('created_at_unix'))
logger.info(f" {i+1}. ID: {item.get('id')}, Timestamp: {ts}, Query: {item.get('query')[:20]}..., Summary in meta: {'synthesized_summary_text' in item.get('metadata', {})}")
stats2 = await mb.get_memory_stats()
logger.info(f"Memory Stats after storing: {stats2}")
exported_file = await mb.export_memory_to_file_async()
if exported_file: logger.info(f"Test export successful: {exported_file}")
else: logger.error("Test export failed.")
logger.info("--- MemoryBank Test Complete ---")
if __name__ == "__main__":
asyncio.run(main_test()) |