# FILE: modules/main_fusion_public.py (True Fusion, Verified Launch Version) import asyncio import logging from typing import Dict, Any, Optional, List # These imports will now work correctly with the verified config file try: from modules.config_settings_public import ( MODEL_PATHS, MODEL_SPECIFIC_PARAMS, INFERENCE_PRESETS, DEFAULT_INFERENCE_PRESET, MODEL_ROLES, MODEL_ROLE_SYSTEM_PROMPTS, DEFAULT_SYSTEM_PROMPT ) from llama_cpp import Llama LLAMA_CPP_AVAILABLE = True except ImportError as e: # This is a critical failure, the app cannot run without these. logging.basicConfig(level=logging.CRITICAL) logging.critical(f"CRITICAL IMPORT ERROR in main_fusion_public.py: {e}. ZOTHEOS cannot function.") LLAMA_CPP_AVAILABLE = False logger = logging.getLogger("ZOTHEOS_MainFusion") if not logger.handlers: # Configure logger if it hasn't been configured yet handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) class MainFusionPublic: """ The core engine for ZOTHEOS. It loads multiple AI models, queries them in parallel with distinct persona-based prompts, and then synthesizes their responses into a higher-order summary. """ def __init__(self): logger.info("🚀 ZOTHEOS MainFusion Initializing (True Fusion Mode)...") if not LLAMA_CPP_AVAILABLE: raise RuntimeError("Llama.cpp backend is not available. ZOTHEOS cannot start.") self.models: Dict[str, Optional[Llama]] = {} self.config = { "MODEL_ROLES": MODEL_ROLES, "MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS, } self.models_last_queried_for_perspectives: List[str] = [] self._initialize_models() def _initialize_models(self): """Loads all models defined in the config into memory.""" for model_name, model_path in MODEL_PATHS.items(): try: logger.info(f"Loading model: {model_name}...") params = MODEL_SPECIFIC_PARAMS['_default'].copy() params.update(MODEL_SPECIFIC_PARAMS.get(model_name, {})) self.models[model_name] = Llama(model_path=model_path, **params) logger.info(f"✅ Model '{model_name}' loaded successfully.") except Exception as e: logger.error(f"❌ Failed to load model '{model_name}': {e}", exc_info=True) self.models[model_name] = None async def _get_single_perspective(self, model_name: str, query: str) -> str: """Queries a single model with its assigned role and returns the response.""" if model_name not in self.models or self.models[model_name] is None: logger.warning(f"Model '{model_name}' is not loaded or failed to initialize.") return f"[Error: The '{model_name}' AI core is offline.]" role = self.config["MODEL_ROLES"].get(model_name, "general") system_prompt = self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(role, DEFAULT_SYSTEM_PROMPT) llm = self.models[model_name] messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}] preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {}) try: logger.info(f"Querying {model_name} ({role})...") # Run the blocking call in a separate thread to not block the event loop response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset) content = response['choices'][0]['message']['content'].strip() logger.info(f"✅ Response received from {model_name}.") return content except Exception as e: logger.error(f"Error during inference for model '{model_name}': {e}", exc_info=True) return f"[Error during inference for '{model_name}'. See logs for details.]" async def _generate_synthesis(self, query: str, perspectives: Dict[str, str]) -> str: """Uses a designated model to synthesize a summary from all perspectives.""" summarizer_model = "gemma" # Gemma is good for this task if summarizer_model not in self.models or self.models[summarizer_model] is None: logger.warning(f"Summarizer model '{summarizer_model}' not available. Returning basic summary.") return "Multiple perspectives were gathered, but the synthesis AI core is offline. Please review the detailed perspectives below." valid_perspectives = {name: text for name, text in perspectives.items() if not text.startswith("[Error")} if not valid_perspectives: return "No valid perspectives could be generated to create a summary." if len(valid_perspectives) == 1: return next(iter(valid_perspectives.values())) synthesis_prompt = f"Original User Question: \"{query}\"\n\nSynthesize the following distinct perspectives into a single, cohesive, and insightful summary. Capture the core agreements, disagreements, and unique insights from each viewpoint. Your goal is to provide a higher-order understanding that integrates these different analyses.\n\n" for name, text in valid_perspectives.items(): role = self.config["MODEL_ROLES"].get(name, "General") synthesis_prompt += f"--- PERSPECTIVE FROM {name.upper()} ({role.capitalize()}) ---\n{text}\n\n" synthesis_prompt += "--- SYNTHESIZED INSIGHT ---\n" summary_system_prompt = "You are a master synthesis AI. Your role is to create a clear and insightful summary from the provided texts, acting as a final arbiter of truth." return await self._get_single_perspective(summarizer_model, synthesis_prompt) def _format_final_output(self, summary: str, perspectives: Dict[str, str]) -> str: """Formats the final Markdown output for the Gradio interface.""" output = f"## ✨ ZOTHEOS Final Synthesized Insight ✨\n\n{summary}\n\n---\n\n### 💬 Detailed Individual Perspectives\n\n" for model_name, text in perspectives.items(): role = self.config["MODEL_ROLES"].get(model_name, "General") output += f"**Perspective from {model_name.capitalize()} ({role.capitalize()}):**\n{text}\n\n" return output.strip() async def process_query_with_fusion(self, query: str, **kwargs): """The main entry point for processing a user query with True Fusion.""" if not all(self.models.values()): return "[Critical Error: Not all AI cores are online. Fusion is not possible.]" # For now, we use all configured models. Tier logic can be re-added here. models_to_query = list(self.models.keys()) self.models_last_queried_for_perspectives = models_to_query logger.info(f"Initiating True Fusion for query: '{query[:60]}...' using models: {models_to_query}") # Create and run all perspective-gathering tasks in parallel perspective_tasks = [self._get_single_perspective(model_name, query) for model_name in models_to_query] gathered_responses = await asyncio.gather(*perspective_tasks) perspectives = dict(zip(models_to_query, gathered_responses)) # Generate the final synthesis based on the gathered perspectives logger.info("All perspectives gathered. Generating final synthesis...") synthesis = await self._generate_synthesis(query, perspectives) # Format the final output for the user logger.info("Synthesis complete. Formatting final output.") return self._format_final_output(synthesis, perspectives) async def get_status_report(self) -> Dict[str, Any]: """Provides a status report for the system.""" return { "status": "Online - True Fusion Mode", "loaded_models": [name for name, model in self.models.items() if model is not None], "failed_models": [name for name, model in self.models.items() if model is None], "last_queried_for_fusion": self.models_last_queried_for_perspectives, }