ZOTHEOS-App / modules /main_fusion_public.py
ZOTHEOS's picture
Upload 9 files
4d1849f verified
raw
history blame
26.2 kB
# FILE: modules/main_fusion_public.py (Finalized with Tier Logic, Async, Fusion Summary)
import asyncio
import logging
import os
import sys
import time
import json
from typing import Dict, Any, Optional, List, Union
try:
# Correctly determine project_root assuming this file is in 'modules'
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
if project_root not in sys.path:
sys.path.insert(0, project_root)
except Exception as e_path:
# Basic logging if path setup fails, though critical
logging.basicConfig(level=logging.ERROR)
logging.error(f"Error setting up sys.path in main_fusion_public.py: {e_path}")
try:
from modules.config_settings_public import (
MODEL_PATHS, MAX_CONCURRENT_MODELS, MAX_RAM_MODELS_GB, # Used by ModelManager init
DEFAULT_SYSTEM_PROMPT, SYSTEM_PERSONAS, INFERENCE_PRESETS, DEFAULT_INFERENCE_PRESET,
MODEL_ROLES, MODEL_ROLE_SYSTEM_PROMPTS
)
from modules.model_manager_public import ModelManager
from modules.memory_bank_public import MemoryBank
from modules.user_auth import get_user_tier
except ImportError as e:
logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.critical(f"CRITICAL IMPORT ERROR in main_fusion_public.py: {e}. ZOTHEOS may not function.", exc_info=True)
# Provide fallbacks for critical components if possible, or allow failure
# For now, if these fail, the __init__ will likely raise an error or log critical status.
# Making the application fail loudly is often better than silent dysfunction.
# Consider exiting if critical imports fail: sys.exit(f"Fatal Import Error in main_fusion_public.py: {e}")
# --- Start of FIX: Define LLAMA_CPP_AVAILABLE ---
try:
# Attempt to import the core Llama class from llama_cpp
from llama_cpp import Llama # You might also need LlamaCppError if you use it
LLAMA_CPP_AVAILABLE = True
# print("DEBUG: llama_cpp imported successfully, LLAMA_CPP_AVAILABLE=True") # Optional debug print
except ImportError:
LLAMA_CPP_AVAILABLE = False
# print("DEBUG: llama_cpp import failed, LLAMA_CPP_AVAILABLE=False") # Optional debug print
except Exception as e_llama_import: # Catch other potential errors during import
LLAMA_CPP_AVAILABLE = False
# print(f"DEBUG: An unexpected error occurred during llama_cpp import: {e_llama_import}, LLAMA_CPP_AVAILABLE=False") # Optional debug print
# --- End of FIX ---
logger = logging.getLogger("ZOTHEOS_MainFusion")
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class MainFusionPublic:
def __init__(self, device_preference: Optional[str] = "cuda"):
logger.info("🚀 ZOTHEOS MainFusion Initializing (Tier Logic, Async Fusion Summary Enabled)...")
self.config = {
"MODEL_ROLES": MODEL_ROLES,
"MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS,
"DEFAULT_SYSTEM_PROMPT": DEFAULT_SYSTEM_PROMPT,
"TIER_CONFIG": {
"free": {"model_limit": 2, "memory_enabled": False, "display_name": "Free Tier"},
"starter": {"model_limit": 3, "memory_enabled": True, "display_name": "Starter Tier"},
# Pro uses max models from MODEL_PATHS, ensuring it uses all available configured models
"pro": {"model_limit": len(MODEL_PATHS.keys()) if MODEL_PATHS else 3, "memory_enabled": True, "display_name": "Pro Tier"}
}
}
self.models_last_queried_for_perspectives: List[str] = []
try:
# Pass global config values for ModelManager initialization
logger.info(f"Initializing ModelManager with device_preference='{device_preference}', max_count={MAX_CONCURRENT_MODELS}, max_ram_gb={MAX_RAM_MODELS_GB}...")
self.model_manager = ModelManager(
device_preference=device_preference,
max_model_count=MAX_CONCURRENT_MODELS,
max_ram_models_gb=MAX_RAM_MODELS_GB
)
# active_model_names_in_order is the master list of all models ZOTHEOS *could* use
self.active_model_names_in_order: List[str] = list(MODEL_PATHS.keys()) if MODEL_PATHS else []
logger.info(f"✅ MainFusion initialized. Max available models for fusion: {self.active_model_names_in_order}")
except Exception as e_mm_init:
logger.critical(f"❌ CRITICAL: ModelManager failed to initialize in MainFusion: {e_mm_init}", exc_info=True)
self.model_manager = None
self.active_model_names_in_order = []
try:
self.memory_bank = MemoryBank()
logger.info("✅ MemoryBank initialized.")
except Exception as e_mb_init:
logger.error(f"❌ MemoryBank failed to initialize: {e_mb_init}. Interactions may not be stored.", exc_info=True)
self.memory_bank = None
if not self.model_manager or not LLAMA_CPP_AVAILABLE: # Check if ModelManager itself or Llama backend failed
logger.critical("MainFusion started in a DEGRADED state: ModelManager or Llama.cpp backend is UNAVAILABLE.")
async def _get_single_model_response_direct(self, model_name: str, user_query: str, system_prompt_for_call: str, preset_name_for_call: str) -> Dict[str, Any]:
response_text = f"[Error: Model '{model_name}' generation did not complete or model unavailable]"
start_time_model = time.perf_counter()
status = "Model Error or Unavailable"
if not self.model_manager:
logger.error(f"[{model_name}] ModelManager not available for generation.")
return {"model": model_name, "text": "[Error: ModelManager is offline]", "time_ms": 0, "status": "ModelManager Offline"}
try:
logger.info(f"⚙️ [{model_name}] Calling ModelManager.generate_with_model. System Prompt: '{system_prompt_for_call[:50]}...'. User Query: '{user_query[:50].replace(chr(10),' ')}...'")
# generate_with_model handles actual call to Llama instance via async_generation_wrapper
response_text = await self.model_manager.generate_with_model(
model_name=model_name,
prompt=user_query,
preset_name=preset_name_for_call,
system_prompt=system_prompt_for_call
)
preview = response_text[:100].replace("\n", " ")
if response_text.startswith(("[Error:", "[No text generated", "[Malformed response")) or not response_text.strip():
logger.warning(f"⚠️ [{model_name}] Model returned an error string or empty content: {preview}...")
status = "Model Error or Empty Response"
else:
logger.info(f"✅ [{model_name}] Response received ({len(response_text)} chars): {preview}...")
status = "Success"
except Exception as e:
logger.error(f"❌ [{model_name}] Exception in _get_single_model_response_direct: {e}", exc_info=True)
response_text = f"[Error: MainFusion encountered an exception during {model_name} generation: {type(e).__name__}]"
status = "MainFusion Exception"
inference_time_ms = (time.perf_counter() - start_time_model) * 1000
return {"model": model_name, "text": response_text.strip(), "time_ms": round(inference_time_ms, 2), "status": status}
async def _generate_true_fusion_summary(self, raw_responses_dict: Dict[str, str], original_query: str, models_actually_used: List[str]) -> str:
logger.info(f"Attempting to generate a true fusion summary from {len(models_actually_used)} perspective(s).")
summarizer_model_key = "gemma" # Default summarizer, ensure it's in MODEL_PATHS and loadable
valid_responses_for_summary = {
model: text for model, text in raw_responses_dict.items()
if model in models_actually_used and text and not text.startswith("[Error:")
}
if not valid_responses_for_summary: # No valid responses at all
logger.warning("No valid responses available to generate a fusion summary.")
return "A summary could not be generated as no valid perspectives were successfully gathered."
if len(valid_responses_for_summary) == 1:
single_model_name = list(valid_responses_for_summary.keys())[0]
logger.info(f"Only one valid perspective from {single_model_name}. Using it as the summary.")
return f"Based on the single available perspective from **{single_model_name.capitalize()}**:\n\n{list(valid_responses_for_summary.values())[0]}"
fusion_prompt = f"Original User Question: \"{original_query}\"\n\n"
fusion_prompt += "You are the Fusion Summary engine for ZOTHEOS. Your task is to read the multiple perspectives provided below and write a concise, balanced, and synthesized summary. Capture the most important points, common themes, and notable differences from all viewpoints. Your tone should be thoughtful, neutral, and respectful. Structure the summary clearly.\n\n"
for model_name, text in valid_responses_for_summary.items():
role = self.config["MODEL_ROLES"].get(model_name, "General")
fusion_prompt += f"--- PERSPECTIVE FROM {model_name.upper()} ({role.capitalize()}) ---\n{text}\n\n"
fusion_prompt += "--- SYNTHESIZED SUMMARY (Combine the perspectives above into a unified insight) ---\n"
logger.info(f"Calling summarizer model '{summarizer_model_key}' (from available: {self.active_model_names_in_order}) for fusion summary.")
if not self.model_manager:
logger.error("ModelManager not available for generating fusion summary.")
return "[Error: Summarizer service unavailable - ModelManager offline]"
try:
# The system prompt for the summarizer itself
summarizer_system_prompt = "You are an expert synthesis AI. Your role is to create a coherent and insightful summary from the provided texts."
summary_text = await self.model_manager.generate_with_model(
model_name=summarizer_model_key,
prompt=fusion_prompt, # The full context with all perspectives
preset_name="precise", # Use a precise preset for summarization
system_prompt=summarizer_system_prompt
)
if summary_text and not summary_text.startswith("[Error:"):
logger.info("✅ True fusion summary generated successfully.")
return summary_text.strip()
else:
logger.warning(f"Summarizer model '{summarizer_model_key}' returned an error or empty response: {summary_text}")
return "[Warning: Summary generation was partial or failed. Displaying raw perspectives.]"
except Exception as e:
logger.error(f"❌ Exception while generating true fusion summary with '{summarizer_model_key}': {e}", exc_info=True)
return f"[Error: The summary generation process failed. Exception: {type(e).__name__}]"
def _analyze_responses_basic(self, responses_dict: Dict[str, str], model_roles: Dict[str, str]) -> Dict[str, Any]:
valid_responses = {model: text for model, text in responses_dict.items() if text and not text.startswith("[Error:")}
consensus_points = []
if len(valid_responses) > 1: consensus_points.append("Multiple perspectives were gathered and synthesized.")
elif len(valid_responses) == 1: consensus_points.append("A single primary perspective was available for synthesis.")
else: consensus_points.append("No valid primary perspectives were available for synthesis.")
return {"consensus_points": consensus_points, "contradictions": [], "unique_insights": valid_responses}
def _synthesize_fusion_response(self, analysis_result: dict, model_roles: dict, raw_responses_dict: dict, final_summary_text: str, models_used_for_perspectives: List[str]) -> str:
response_parts = []
response_parts.append("## ✨ ZOTHEOS Final Synthesized Insight ✨")
response_parts.append(final_summary_text if final_summary_text and not final_summary_text.startswith(("[Error:", "[Warning:")) else "*Synthesis process encountered an issue or no summary was generated. Please see detailed perspectives below.*")
response_parts.append("\n---\n")
response_parts.append("### 💬 Detailed Individual Perspectives")
has_any_valid_perspectives = False
for model_name in models_used_for_perspectives:
text = raw_responses_dict.get(model_name)
role = model_roles.get(model_name, "General") # Default role
response_parts.append(f"**Perspective from {model_name.capitalize()} ({role.capitalize()}):**")
if text and not text.startswith("[Error:"):
response_parts.append(text.strip())
has_any_valid_perspectives = True
else:
response_parts.append(f"*{text if text else '[No response or error from this model.]'}*") # Display error or placeholder
response_parts.append("")
if not has_any_valid_perspectives and not (final_summary_text and not final_summary_text.startswith(("[Error:", "[Warning:"))):
# If summary also failed/empty and no valid individual perspectives.
response_parts = ["## ⚠️ ZOTHEOS Alert\n\nUnfortunately, ZOTHEOS encountered issues processing your query with all available AI cores for your tier. No insights could be gathered at this time. Please try rephrasing your query or try again later."]
elif not has_any_valid_perspectives : # Summary might be there, but no individual details.
response_parts.append("*No valid individual perspectives were successfully retrieved to display in detail.*")
return "\n".join(response_parts).strip()
async def process_query_with_fusion(
self,
query: str,
user_token: Optional[str] = None,
persona_key: Optional[str] = None,
fusion_mode_override: str = "balanced",
**kwargs
) -> str:
process_start_time = time.time()
current_tier_name = get_user_tier(user_token if user_token else "")
tier_settings = self.config["TIER_CONFIG"].get(current_tier_name, self.config["TIER_CONFIG"]["free"])
tier_model_limit = tier_settings["model_limit"]
tier_memory_enabled = tier_settings["memory_enabled"]
logger.info(f"User Tier: '{current_tier_name}' ({tier_settings['display_name']}). Model Limit: {tier_model_limit}, Memory: {'Enabled' if tier_memory_enabled else 'Disabled'}.")
if not self.model_manager or not LLAMA_CPP_AVAILABLE: return "[Error: ZOTHEOS Core Model Manager not ready or Llama.cpp backend unavailable.]"
if not self.active_model_names_in_order: return "[Error: ZOTHEOS Core not ready. No models configured in MODEL_PATHS.]"
if not query or not query.strip(): return "[Error: Query is empty. Please provide a question or topic.]"
current_query_text = query
current_preset_name = fusion_mode_override if fusion_mode_override in INFERENCE_PRESETS else DEFAULT_INFERENCE_PRESET
base_persona_prompt = SYSTEM_PERSONAS.get(persona_key or "default", self.config["DEFAULT_SYSTEM_PROMPT"])
# Determine actual models to use based on tier limit and availability
models_to_use_for_perspectives = [m for m in self.active_model_names_in_order if m in MODEL_PATHS][:tier_model_limit]
self.models_last_queried_for_perspectives = models_to_use_for_perspectives # For status report
if not models_to_use_for_perspectives:
logger.error(f"No models available for tier '{current_tier_name}' after applying limit of {tier_model_limit}.")
return f"[Error: No models available for your current tier ('{current_tier_name}').]"
logger.info(f"🔎 Processing query. Models for perspectives (Tier: {current_tier_name}): {models_to_use_for_perspectives}. Preset: '{current_preset_name}'. Query: '{current_query_text[:60].replace(chr(10),' ')}...'")
raw_responses_dict: Dict[str, str] = {}
individual_results_for_memory: List[Dict[str, Any]] = []
successful_responses = 0
for model_name in models_to_use_for_perspectives:
model_role = self.config["MODEL_ROLES"].get(model_name, "general")
system_prompt_for_model = self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(model_role, base_persona_prompt)
query_for_this_model = current_query_text
if model_name.lower() == "gemma" and system_prompt_for_model:
query_for_this_model = f"<start_of_turn>user\n{system_prompt_for_model.strip()}\n{current_query_text}<end_of_turn>\n<start_of_turn>model\n"
system_prompt_for_model = ""
model_output_data = await self._get_single_model_response_direct(model_name, query_for_this_model, system_prompt_for_model, current_preset_name)
individual_results_for_memory.append(model_output_data)
raw_responses_dict[model_name] = model_output_data.get("text", "[Error: No text field in response data]")
if model_output_data.get("status") == "Success":
successful_responses += 1
synthesized_summary_text = await self._generate_true_fusion_summary(raw_responses_dict, current_query_text, models_to_use_for_perspectives)
analysis_result = self._analyze_responses_basic(raw_responses_dict, self.config["MODEL_ROLES"]) # Basic analysis for now
final_fused_output_content = self._synthesize_fusion_response(analysis_result, self.config["MODEL_ROLES"], raw_responses_dict, synthesized_summary_text, models_to_use_for_perspectives)
persona_display = (persona_key or "default").capitalize()
mode_display = current_preset_name.capitalize()
tier_display_name = tier_settings.get("display_name", current_tier_name.capitalize())
final_header = f"## 🧠 ZOTHEOS Fused Perspectives 🧠\n*(Fusion Mode: {mode_display} | Persona: {persona_display} | Tier: {tier_display_name})*\n\n"
final_fused_output = final_header + final_fused_output_content
if successful_responses == 0 and not "[Error:" in final_fused_output_content and not "[Warning:" in final_fused_output_content:
logger.error(f"All models ({len(models_to_use_for_perspectives)}) failed for tier '{current_tier_name}'.")
final_fused_output = final_header + "[Critical Error: ZOTHEOS was unable to obtain any valid responses from its AI cores for this query.]\n\n" + final_fused_output_content.split("\n\n",1)[-1]
if tier_memory_enabled:
if self.memory_bank:
try:
memory_metadata = {
"user_token_used_prefix": user_token[:3] + "***" if user_token and len(user_token) > 3 else "N/A (No Token)" if not user_token else user_token,
"tier_at_interaction": current_tier_name,
"persona_key": persona_key or "default", "fusion_mode_used": current_preset_name,
# timestamp_iso is now added within store_memory_async
"duration_seconds": round(time.time() - process_start_time, 3),
"active_models_queried": models_to_use_for_perspectives, # Models actually used for perspectives
"individual_model_outputs": individual_results_for_memory, # Detailed dicts
"synthesized_summary_text": synthesized_summary_text, # The AI-generated summary
"fused_response_length_chars": len(final_fused_output),
"successful_model_responses": successful_responses,
"total_models_queried": len(models_to_use_for_perspectives)
}
await self.memory_bank.store_memory_async(query=current_query_text, response=final_fused_output, metadata=memory_metadata)
except Exception as e_mem: logger.error(f"Failed to store fusion interaction in MemoryBank (Tier: '{current_tier_name}'): {e_mem}", exc_info=True)
else: logger.warning(f"MemoryBank not initialized. Skipping storage (Tier: '{current_tier_name}').")
else: logger.info(f"Memory storage disabled for tier '{current_tier_name}'. Skipping storage.")
total_processing_time = round(time.time() - process_start_time, 2)
logger.info(f"🧠 Fusion complete in {total_processing_time}s. Output len: {len(final_fused_output)}. Models used: {len(models_to_use_for_perspectives)} (Tier: {current_tier_name}).")
return final_fused_output
async def get_status_report(self) -> Dict[str, Any]:
report: Dict[str, Any] = {
"status": "Full Multi-Model Fusion Mode Active",
"fusion_engine_status": "Online" if self.model_manager and self.active_model_names_in_order and LLAMA_CPP_AVAILABLE else "Degraded/Offline",
"all_available_models": self.active_model_names_in_order,
"models_last_queried_for_perspectives": getattr(self, 'models_last_queried_for_perspectives', []),
"model_manager_status": "Online" if self.model_manager else "Offline/Init Failed",
"llama_cpp_backend_available": LLAMA_CPP_AVAILABLE,
"memory_bank_status": "Online" if self.memory_bank else "Offline/Init Failed"
}
if self.model_manager and hasattr(self.model_manager, 'get_loaded_model_stats'):
try: report["model_manager_runtime_stats"] = self.model_manager.get_loaded_model_stats()
except Exception as e: report["model_manager_runtime_stats"] = f"Error getting MM stats: {e}"
else: report["model_manager_runtime_stats"] = "ModelManager N/A for stats."
if self.memory_bank and hasattr(self.memory_bank, 'get_memory_stats'):
try: report["memory_bank_stats"] = await self.memory_bank.get_memory_stats()
except Exception as e: report["memory_bank_stats"] = f"Error getting MB stats: {e}"
else: report["memory_bank_stats"] = "MemoryBank N/A for stats."
return report
if __name__ == "__main__":
if os.getenv("ZOTHEOS_DEBUG_DEPS", "false").lower() != "true":
for lib_logger_name in ["torch", "huggingface_hub", "psutil", "llama_cpp", "httpx", "PIL"]: logging.getLogger(lib_logger_name).setLevel(logging.WARNING)
logger.setLevel(logging.DEBUG)
logger.info("--- MainFusionPublic (Tier Logic & Async Summary) CLI Test ---")
async def run_main_fusion_cli_test_with_token(test_token=None, token_desc="Default (Free Tier)"):
main_fusion_instance: Optional[MainFusionPublic] = None
logger.info(f"\n--- Testing with token: '{token_desc}' ---")
try:
main_fusion_instance = MainFusionPublic(device_preference="cuda")
if not main_fusion_instance.model_manager or not main_fusion_instance.active_model_names_in_order or not LLAMA_CPP_AVAILABLE:
logger.critical("CLI Test Aborted: MainFusion init failed (MM or LlamaCPP unavailable)."); return
test_query = "What are the core principles of Stoicism and how can they be applied in modern life?"
logger.info(f"CLI Test: Querying (Token: {test_token[:3] + '...' if test_token and len(test_token)>3 else test_token}): '{test_query}'")
response = await main_fusion_instance.process_query_with_fusion(query=test_query, user_token=test_token, persona_key="philosopher", fusion_mode_override="balanced")
print("\n" + "="*25 + f" CLI Test Response ({token_desc}) " + "="*25); print(response); print("="* (50 + len(f" CLI Test Response ({token_desc}) ") + 2))
status = await main_fusion_instance.get_status_report()
print("\nSystem Status Report After Query:"); print(json.dumps(status, indent=2, default=str))
except Exception as e: logger.critical(f"Error during CLI test ({token_desc}): {e}", exc_info=True); print(f"🚨 CLI Test Error ({token_desc}): {e}")
finally:
if main_fusion_instance and main_fusion_instance.model_manager and hasattr(main_fusion_instance.model_manager, 'shutdown'):
logger.info(f"CLI Test ({token_desc}): Shutting down ModelManager...")
main_fusion_instance.model_manager.shutdown() # This is synchronous
logger.info(f"🛑 MainFusion CLI test ({token_desc}) shutdown.")
async def run_all_cli_tests():
# Ensure stripe_users.json is in project root for this test to work with tokens
tokens_to_test = {
None: "No Token (Defaults to Free)",
"TOKEN_FOR_FREE_TEST": "Free Tier Token", # Add this to your stripe_users.json
"TOKEN_FOR_STARTER_TEST": "Starter Tier Token", # Add this
"TOKEN_FOR_PRO_TEST": "Pro Tier Token" # Add this
}
# Create dummy stripe_users.json if not exists for test
if not os.path.exists(os.path.join(project_root, "stripe_users.json")):
logger.warning("Creating dummy stripe_users.json for CLI test.")
dummy_users = {t: t.split('_')[1].lower() for t in tokens_to_test if t} # type: ignore
with open(os.path.join(project_root, "stripe_users.json"), "w") as f:
json.dump(dummy_users, f, indent=2)
for token, desc in tokens_to_test.items():
await run_main_fusion_cli_test_with_token(token, desc)
asyncio.run(run_all_cli_tests())