# FILE: modules/main_fusion_public.py (Hugging Face Demo) import asyncio import logging import json from typing import Dict, Any, Optional, List try: from modules.config_settings_public import ( MODEL_PATHS, MODEL_SPECIFIC_PARAMS, INFERENCE_PRESETS, DEFAULT_INFERENCE_PRESET, MODEL_ROLES, MODEL_ROLE_SYSTEM_PROMPTS, DEFAULT_SYSTEM_PROMPT ) from llama_cpp import Llama LLAMA_CPP_AVAILABLE = True except ImportError as e: logging.basicConfig(level=logging.CRITICAL) logging.critical(f"CRITICAL HF IMPORT ERROR: {e}.") LLAMA_CPP_AVAILABLE = False logger = logging.getLogger("ZOTHEOS_MainFusion_HF") # --- ✅ Simple In-Memory MemoryBank for the Web Demo --- class MemoryBank: """A simple, non-persistent memory bank for the web demo.""" def __init__(self): self.memories: List[Dict[str, Any]] = [] async def store_memory_async(self, query: str, response: str, metadata: Optional[Dict[str, Any]] = None): entry = {'query': query, 'response': response, 'metadata': metadata or {}} self.memories.append(entry) if len(self.memories) > 10: self.memories.pop(0) # Keep memory from growing too large async def retrieve_recent_memories_async(self, limit: int = 5) -> List[Dict[str, Any]]: return self.memories[-limit:] async def get_all_memories_for_export_async(self) -> List[Dict[str, Any]]: return list(self.memories) # Return a copy # --- ✅ The AGI-Tier Engine, Optimized for Web --- class MainFusionPublic: def __init__(self): logger.info("🚀 ZOTHEOS HF Demo Engine Initializing...") if not LLAMA_CPP_AVAILABLE: raise RuntimeError("Llama.cpp backend is not available.") self.models: Dict[str, Optional[Llama]] = {} self.config = {"MODEL_ROLES": MODEL_ROLES, "MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS} self.models_last_queried: List[str] = [] self.memory_bank = MemoryBank() self._initialize_models() def _initialize_models(self): for name, path in MODEL_PATHS.items(): try: logger.info(f"HF Demo: Loading model {name}...") params = MODEL_SPECIFIC_PARAMS.get('_default', {}).copy() params.update(MODEL_SPECIFIC_PARAMS.get(name, {})) self.models[name] = Llama(model_path=path, **params) logger.info(f"✅ HF Demo: Model '{name}' loaded.") except Exception as e: logger.error(f"❌ HF Demo: Failed to load model '{name}': {e}", exc_info=True) self.models[name] = None async def _get_perspective(self, model_name: str, query: str, system_prompt: str) -> str: llm = self.models.get(model_name) if not llm: return f"[Error: '{model_name}' core is offline.]" messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}] preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {}) try: response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset) return response['choices'][0]['message']['content'].strip() except Exception as e: logger.error(f"HF Demo Inference error for '{model_name}': {e}") return f"[Error during inference for '{model_name}'.]" async def _generate_true_synthesis(self, query: str, perspectives: Dict[str, str]) -> str: summarizer_model = "gemma" if summarizer_model not in self.models or not self.models[summarizer_model]: return "Synthesis AI core is offline." valid_perspectives = {k: v for k, v in perspectives.items() if not v.startswith("[Error")} if not valid_perspectives: return "No valid perspectives were generated." synthesis_prompt = f""" As the master intelligence of ZOTHEOS, perform a high-level synthesis of these perspectives on the user's query: "{query}". Your goal is to find the deeper truth. Analyze the viewpoints to: 1. Identify the core theme. 2. Highlight the most significant tension between them. 3. Extract a unique "aha!" insight that emerges from their combination. 4. Conclude with a definitive, balanced statement. Perspectives: {json.dumps(valid_perspectives, indent=2)} Your Final Synthesized Insight: """ system_prompt = "You are a master synthesis AI. Create a clear, insightful summary from the provided texts." return await self._get_perspective(summarizer_model, synthesis_prompt, system_prompt) def _format_output(self, summary: str, perspectives: Dict[str, str]) -> str: output = f"## ✨ ZOTHEOS Final Synthesized Insight ✨\n\n{summary}\n\n### 💬 Detailed Individual Perspectives\n\n" for name, text in perspectives.items(): role = self.config["MODEL_ROLES"].get(name, "General") output += f"**Perspective from {name.capitalize()} ({role.capitalize()}):**\n{text}\n\n" return output.strip() async def process_query_with_fusion(self, query: str, **kwargs) -> str: online_models = [name for name, model in self.models.items() if model] if not online_models: return "[Critical Error: All AI cores are offline.]" self.models_last_queried = online_models tasks = {name: self._get_perspective( name, query, self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(self.config["MODEL_ROLES"].get(name, "general"), DEFAULT_SYSTEM_PROMPT) ) for name in online_models} responses = await asyncio.gather(*tasks.values()) perspectives = dict(zip(tasks.keys(), responses)) synthesis = await self._generate_true_synthesis(query, perspectives) final_output = self._format_output(synthesis, perspectives) if self.memory_bank: await self.memory_bank.store_memory_async(query=query, response=final_output) return final_output async def get_status_report(self) -> Dict[str, Any]: return {"status": "Online - Web Demo Mode", "loaded_models": [name for name, model in self.models.items() if model is not None]}