Spaces:
Sleeping
Sleeping
# FILE: modules/main_fusion_public.py (True Fusion, Verified Launch Version) | |
import asyncio | |
import logging | |
from typing import Dict, Any, Optional, List | |
# These imports will now work correctly with the verified config file | |
try: | |
from modules.config_settings_public import ( | |
MODEL_PATHS, | |
MODEL_SPECIFIC_PARAMS, | |
INFERENCE_PRESETS, | |
DEFAULT_INFERENCE_PRESET, | |
MODEL_ROLES, | |
MODEL_ROLE_SYSTEM_PROMPTS, | |
DEFAULT_SYSTEM_PROMPT | |
) | |
from llama_cpp import Llama | |
LLAMA_CPP_AVAILABLE = True | |
except ImportError as e: | |
# This is a critical failure, the app cannot run without these. | |
logging.basicConfig(level=logging.CRITICAL) | |
logging.critical(f"CRITICAL IMPORT ERROR in main_fusion_public.py: {e}. ZOTHEOS cannot function.") | |
LLAMA_CPP_AVAILABLE = False | |
logger = logging.getLogger("ZOTHEOS_MainFusion") | |
if not logger.handlers: | |
# Configure logger if it hasn't been configured yet | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.setLevel(logging.INFO) | |
class MainFusionPublic: | |
""" | |
The core engine for ZOTHEOS. It loads multiple AI models, queries them in parallel | |
with distinct persona-based prompts, and then synthesizes their responses into a | |
higher-order summary. | |
""" | |
def __init__(self): | |
logger.info("π ZOTHEOS MainFusion Initializing (True Fusion Mode)...") | |
if not LLAMA_CPP_AVAILABLE: | |
raise RuntimeError("Llama.cpp backend is not available. ZOTHEOS cannot start.") | |
self.models: Dict[str, Optional[Llama]] = {} | |
self.config = { | |
"MODEL_ROLES": MODEL_ROLES, | |
"MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS, | |
} | |
self.models_last_queried_for_perspectives: List[str] = [] | |
self._initialize_models() | |
def _initialize_models(self): | |
"""Loads all models defined in the config into memory.""" | |
for model_name, model_path in MODEL_PATHS.items(): | |
try: | |
logger.info(f"Loading model: {model_name}...") | |
params = MODEL_SPECIFIC_PARAMS['_default'].copy() | |
params.update(MODEL_SPECIFIC_PARAMS.get(model_name, {})) | |
self.models[model_name] = Llama(model_path=model_path, **params) | |
logger.info(f"β Model '{model_name}' loaded successfully.") | |
except Exception as e: | |
logger.error(f"β Failed to load model '{model_name}': {e}", exc_info=True) | |
self.models[model_name] = None | |
async def _get_single_perspective(self, model_name: str, query: str) -> str: | |
"""Queries a single model with its assigned role and returns the response.""" | |
if model_name not in self.models or self.models[model_name] is None: | |
logger.warning(f"Model '{model_name}' is not loaded or failed to initialize.") | |
return f"[Error: The '{model_name}' AI core is offline.]" | |
role = self.config["MODEL_ROLES"].get(model_name, "general") | |
system_prompt = self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(role, DEFAULT_SYSTEM_PROMPT) | |
llm = self.models[model_name] | |
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}] | |
preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {}) | |
try: | |
logger.info(f"Querying {model_name} ({role})...") | |
# Run the blocking call in a separate thread to not block the event loop | |
response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset) | |
content = response['choices'][0]['message']['content'].strip() | |
logger.info(f"β Response received from {model_name}.") | |
return content | |
except Exception as e: | |
logger.error(f"Error during inference for model '{model_name}': {e}", exc_info=True) | |
return f"[Error during inference for '{model_name}'. See logs for details.]" | |
async def _generate_synthesis(self, query: str, perspectives: Dict[str, str]) -> str: | |
"""Uses a designated model to synthesize a summary from all perspectives.""" | |
summarizer_model = "gemma" # Gemma is good for this task | |
if summarizer_model not in self.models or self.models[summarizer_model] is None: | |
logger.warning(f"Summarizer model '{summarizer_model}' not available. Returning basic summary.") | |
return "Multiple perspectives were gathered, but the synthesis AI core is offline. Please review the detailed perspectives below." | |
valid_perspectives = {name: text for name, text in perspectives.items() if not text.startswith("[Error")} | |
if not valid_perspectives: | |
return "No valid perspectives could be generated to create a summary." | |
if len(valid_perspectives) == 1: | |
return next(iter(valid_perspectives.values())) | |
synthesis_prompt = f"Original User Question: \"{query}\"\n\nSynthesize the following distinct perspectives into a single, cohesive, and insightful summary. Capture the core agreements, disagreements, and unique insights from each viewpoint. Your goal is to provide a higher-order understanding that integrates these different analyses.\n\n" | |
for name, text in valid_perspectives.items(): | |
role = self.config["MODEL_ROLES"].get(name, "General") | |
synthesis_prompt += f"--- PERSPECTIVE FROM {name.upper()} ({role.capitalize()}) ---\n{text}\n\n" | |
synthesis_prompt += "--- SYNTHESIZED INSIGHT ---\n" | |
summary_system_prompt = "You are a master synthesis AI. Your role is to create a clear and insightful summary from the provided texts, acting as a final arbiter of truth." | |
return await self._get_single_perspective(summarizer_model, synthesis_prompt) | |
def _format_final_output(self, summary: str, perspectives: Dict[str, str]) -> str: | |
"""Formats the final Markdown output for the Gradio interface.""" | |
output = f"## β¨ ZOTHEOS Final Synthesized Insight β¨\n\n{summary}\n\n---\n\n### π¬ Detailed Individual Perspectives\n\n" | |
for model_name, text in perspectives.items(): | |
role = self.config["MODEL_ROLES"].get(model_name, "General") | |
output += f"**Perspective from {model_name.capitalize()} ({role.capitalize()}):**\n{text}\n\n" | |
return output.strip() | |
async def process_query_with_fusion(self, query: str, **kwargs): | |
"""The main entry point for processing a user query with True Fusion.""" | |
if not all(self.models.values()): | |
return "[Critical Error: Not all AI cores are online. Fusion is not possible.]" | |
# For now, we use all configured models. Tier logic can be re-added here. | |
models_to_query = list(self.models.keys()) | |
self.models_last_queried_for_perspectives = models_to_query | |
logger.info(f"Initiating True Fusion for query: '{query[:60]}...' using models: {models_to_query}") | |
# Create and run all perspective-gathering tasks in parallel | |
perspective_tasks = [self._get_single_perspective(model_name, query) for model_name in models_to_query] | |
gathered_responses = await asyncio.gather(*perspective_tasks) | |
perspectives = dict(zip(models_to_query, gathered_responses)) | |
# Generate the final synthesis based on the gathered perspectives | |
logger.info("All perspectives gathered. Generating final synthesis...") | |
synthesis = await self._generate_synthesis(query, perspectives) | |
# Format the final output for the user | |
logger.info("Synthesis complete. Formatting final output.") | |
return self._format_final_output(synthesis, perspectives) | |
async def get_status_report(self) -> Dict[str, Any]: | |
"""Provides a status report for the system.""" | |
return { | |
"status": "Online - True Fusion Mode", | |
"loaded_models": [name for name, model in self.models.items() if model is not None], | |
"failed_models": [name for name, model in self.models.items() if model is None], | |
"last_queried_for_fusion": self.models_last_queried_for_perspectives, | |
} |