ZOTHEOS-App / modules /main_fusion_public.py
ZOTHEOS's picture
Update modules/main_fusion_public.py
ed8a3b2 verified
raw
history blame
8.34 kB
# FILE: modules/main_fusion_public.py (True Fusion, Verified Launch Version)
import asyncio
import logging
from typing import Dict, Any, Optional, List
# These imports will now work correctly with the verified config file
try:
from modules.config_settings_public import (
MODEL_PATHS,
MODEL_SPECIFIC_PARAMS,
INFERENCE_PRESETS,
DEFAULT_INFERENCE_PRESET,
MODEL_ROLES,
MODEL_ROLE_SYSTEM_PROMPTS,
DEFAULT_SYSTEM_PROMPT
)
from llama_cpp import Llama
LLAMA_CPP_AVAILABLE = True
except ImportError as e:
# This is a critical failure, the app cannot run without these.
logging.basicConfig(level=logging.CRITICAL)
logging.critical(f"CRITICAL IMPORT ERROR in main_fusion_public.py: {e}. ZOTHEOS cannot function.")
LLAMA_CPP_AVAILABLE = False
logger = logging.getLogger("ZOTHEOS_MainFusion")
if not logger.handlers:
# Configure logger if it hasn't been configured yet
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class MainFusionPublic:
"""
The core engine for ZOTHEOS. It loads multiple AI models, queries them in parallel
with distinct persona-based prompts, and then synthesizes their responses into a
higher-order summary.
"""
def __init__(self):
logger.info("πŸš€ ZOTHEOS MainFusion Initializing (True Fusion Mode)...")
if not LLAMA_CPP_AVAILABLE:
raise RuntimeError("Llama.cpp backend is not available. ZOTHEOS cannot start.")
self.models: Dict[str, Optional[Llama]] = {}
self.config = {
"MODEL_ROLES": MODEL_ROLES,
"MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS,
}
self.models_last_queried_for_perspectives: List[str] = []
self._initialize_models()
def _initialize_models(self):
"""Loads all models defined in the config into memory."""
for model_name, model_path in MODEL_PATHS.items():
try:
logger.info(f"Loading model: {model_name}...")
params = MODEL_SPECIFIC_PARAMS['_default'].copy()
params.update(MODEL_SPECIFIC_PARAMS.get(model_name, {}))
self.models[model_name] = Llama(model_path=model_path, **params)
logger.info(f"βœ… Model '{model_name}' loaded successfully.")
except Exception as e:
logger.error(f"❌ Failed to load model '{model_name}': {e}", exc_info=True)
self.models[model_name] = None
async def _get_single_perspective(self, model_name: str, query: str) -> str:
"""Queries a single model with its assigned role and returns the response."""
if model_name not in self.models or self.models[model_name] is None:
logger.warning(f"Model '{model_name}' is not loaded or failed to initialize.")
return f"[Error: The '{model_name}' AI core is offline.]"
role = self.config["MODEL_ROLES"].get(model_name, "general")
system_prompt = self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(role, DEFAULT_SYSTEM_PROMPT)
llm = self.models[model_name]
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}]
preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {})
try:
logger.info(f"Querying {model_name} ({role})...")
# Run the blocking call in a separate thread to not block the event loop
response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset)
content = response['choices'][0]['message']['content'].strip()
logger.info(f"βœ… Response received from {model_name}.")
return content
except Exception as e:
logger.error(f"Error during inference for model '{model_name}': {e}", exc_info=True)
return f"[Error during inference for '{model_name}'. See logs for details.]"
async def _generate_synthesis(self, query: str, perspectives: Dict[str, str]) -> str:
"""Uses a designated model to synthesize a summary from all perspectives."""
summarizer_model = "gemma" # Gemma is good for this task
if summarizer_model not in self.models or self.models[summarizer_model] is None:
logger.warning(f"Summarizer model '{summarizer_model}' not available. Returning basic summary.")
return "Multiple perspectives were gathered, but the synthesis AI core is offline. Please review the detailed perspectives below."
valid_perspectives = {name: text for name, text in perspectives.items() if not text.startswith("[Error")}
if not valid_perspectives:
return "No valid perspectives could be generated to create a summary."
if len(valid_perspectives) == 1:
return next(iter(valid_perspectives.values()))
synthesis_prompt = f"Original User Question: \"{query}\"\n\nSynthesize the following distinct perspectives into a single, cohesive, and insightful summary. Capture the core agreements, disagreements, and unique insights from each viewpoint. Your goal is to provide a higher-order understanding that integrates these different analyses.\n\n"
for name, text in valid_perspectives.items():
role = self.config["MODEL_ROLES"].get(name, "General")
synthesis_prompt += f"--- PERSPECTIVE FROM {name.upper()} ({role.capitalize()}) ---\n{text}\n\n"
synthesis_prompt += "--- SYNTHESIZED INSIGHT ---\n"
summary_system_prompt = "You are a master synthesis AI. Your role is to create a clear and insightful summary from the provided texts, acting as a final arbiter of truth."
return await self._get_single_perspective(summarizer_model, synthesis_prompt)
def _format_final_output(self, summary: str, perspectives: Dict[str, str]) -> str:
"""Formats the final Markdown output for the Gradio interface."""
output = f"## ✨ ZOTHEOS Final Synthesized Insight ✨\n\n{summary}\n\n---\n\n### πŸ’¬ Detailed Individual Perspectives\n\n"
for model_name, text in perspectives.items():
role = self.config["MODEL_ROLES"].get(model_name, "General")
output += f"**Perspective from {model_name.capitalize()} ({role.capitalize()}):**\n{text}\n\n"
return output.strip()
async def process_query_with_fusion(self, query: str, **kwargs):
"""The main entry point for processing a user query with True Fusion."""
if not all(self.models.values()):
return "[Critical Error: Not all AI cores are online. Fusion is not possible.]"
# For now, we use all configured models. Tier logic can be re-added here.
models_to_query = list(self.models.keys())
self.models_last_queried_for_perspectives = models_to_query
logger.info(f"Initiating True Fusion for query: '{query[:60]}...' using models: {models_to_query}")
# Create and run all perspective-gathering tasks in parallel
perspective_tasks = [self._get_single_perspective(model_name, query) for model_name in models_to_query]
gathered_responses = await asyncio.gather(*perspective_tasks)
perspectives = dict(zip(models_to_query, gathered_responses))
# Generate the final synthesis based on the gathered perspectives
logger.info("All perspectives gathered. Generating final synthesis...")
synthesis = await self._generate_synthesis(query, perspectives)
# Format the final output for the user
logger.info("Synthesis complete. Formatting final output.")
return self._format_final_output(synthesis, perspectives)
async def get_status_report(self) -> Dict[str, Any]:
"""Provides a status report for the system."""
return {
"status": "Online - True Fusion Mode",
"loaded_models": [name for name, model in self.models.items() if model is not None],
"failed_models": [name for name, model in self.models.items() if model is None],
"last_queried_for_fusion": self.models_last_queried_for_perspectives,
}