Spaces:
Sleeping
Sleeping
File size: 8,341 Bytes
ed8a3b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# FILE: modules/main_fusion_public.py (True Fusion, Verified Launch Version)
import asyncio
import logging
from typing import Dict, Any, Optional, List
# These imports will now work correctly with the verified config file
try:
from modules.config_settings_public import (
MODEL_PATHS,
MODEL_SPECIFIC_PARAMS,
INFERENCE_PRESETS,
DEFAULT_INFERENCE_PRESET,
MODEL_ROLES,
MODEL_ROLE_SYSTEM_PROMPTS,
DEFAULT_SYSTEM_PROMPT
)
from llama_cpp import Llama
LLAMA_CPP_AVAILABLE = True
except ImportError as e:
# This is a critical failure, the app cannot run without these.
logging.basicConfig(level=logging.CRITICAL)
logging.critical(f"CRITICAL IMPORT ERROR in main_fusion_public.py: {e}. ZOTHEOS cannot function.")
LLAMA_CPP_AVAILABLE = False
logger = logging.getLogger("ZOTHEOS_MainFusion")
if not logger.handlers:
# Configure logger if it hasn't been configured yet
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class MainFusionPublic:
"""
The core engine for ZOTHEOS. It loads multiple AI models, queries them in parallel
with distinct persona-based prompts, and then synthesizes their responses into a
higher-order summary.
"""
def __init__(self):
logger.info("π ZOTHEOS MainFusion Initializing (True Fusion Mode)...")
if not LLAMA_CPP_AVAILABLE:
raise RuntimeError("Llama.cpp backend is not available. ZOTHEOS cannot start.")
self.models: Dict[str, Optional[Llama]] = {}
self.config = {
"MODEL_ROLES": MODEL_ROLES,
"MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS,
}
self.models_last_queried_for_perspectives: List[str] = []
self._initialize_models()
def _initialize_models(self):
"""Loads all models defined in the config into memory."""
for model_name, model_path in MODEL_PATHS.items():
try:
logger.info(f"Loading model: {model_name}...")
params = MODEL_SPECIFIC_PARAMS['_default'].copy()
params.update(MODEL_SPECIFIC_PARAMS.get(model_name, {}))
self.models[model_name] = Llama(model_path=model_path, **params)
logger.info(f"β
Model '{model_name}' loaded successfully.")
except Exception as e:
logger.error(f"β Failed to load model '{model_name}': {e}", exc_info=True)
self.models[model_name] = None
async def _get_single_perspective(self, model_name: str, query: str) -> str:
"""Queries a single model with its assigned role and returns the response."""
if model_name not in self.models or self.models[model_name] is None:
logger.warning(f"Model '{model_name}' is not loaded or failed to initialize.")
return f"[Error: The '{model_name}' AI core is offline.]"
role = self.config["MODEL_ROLES"].get(model_name, "general")
system_prompt = self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(role, DEFAULT_SYSTEM_PROMPT)
llm = self.models[model_name]
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}]
preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {})
try:
logger.info(f"Querying {model_name} ({role})...")
# Run the blocking call in a separate thread to not block the event loop
response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset)
content = response['choices'][0]['message']['content'].strip()
logger.info(f"β
Response received from {model_name}.")
return content
except Exception as e:
logger.error(f"Error during inference for model '{model_name}': {e}", exc_info=True)
return f"[Error during inference for '{model_name}'. See logs for details.]"
async def _generate_synthesis(self, query: str, perspectives: Dict[str, str]) -> str:
"""Uses a designated model to synthesize a summary from all perspectives."""
summarizer_model = "gemma" # Gemma is good for this task
if summarizer_model not in self.models or self.models[summarizer_model] is None:
logger.warning(f"Summarizer model '{summarizer_model}' not available. Returning basic summary.")
return "Multiple perspectives were gathered, but the synthesis AI core is offline. Please review the detailed perspectives below."
valid_perspectives = {name: text for name, text in perspectives.items() if not text.startswith("[Error")}
if not valid_perspectives:
return "No valid perspectives could be generated to create a summary."
if len(valid_perspectives) == 1:
return next(iter(valid_perspectives.values()))
synthesis_prompt = f"Original User Question: \"{query}\"\n\nSynthesize the following distinct perspectives into a single, cohesive, and insightful summary. Capture the core agreements, disagreements, and unique insights from each viewpoint. Your goal is to provide a higher-order understanding that integrates these different analyses.\n\n"
for name, text in valid_perspectives.items():
role = self.config["MODEL_ROLES"].get(name, "General")
synthesis_prompt += f"--- PERSPECTIVE FROM {name.upper()} ({role.capitalize()}) ---\n{text}\n\n"
synthesis_prompt += "--- SYNTHESIZED INSIGHT ---\n"
summary_system_prompt = "You are a master synthesis AI. Your role is to create a clear and insightful summary from the provided texts, acting as a final arbiter of truth."
return await self._get_single_perspective(summarizer_model, synthesis_prompt)
def _format_final_output(self, summary: str, perspectives: Dict[str, str]) -> str:
"""Formats the final Markdown output for the Gradio interface."""
output = f"## β¨ ZOTHEOS Final Synthesized Insight β¨\n\n{summary}\n\n---\n\n### π¬ Detailed Individual Perspectives\n\n"
for model_name, text in perspectives.items():
role = self.config["MODEL_ROLES"].get(model_name, "General")
output += f"**Perspective from {model_name.capitalize()} ({role.capitalize()}):**\n{text}\n\n"
return output.strip()
async def process_query_with_fusion(self, query: str, **kwargs):
"""The main entry point for processing a user query with True Fusion."""
if not all(self.models.values()):
return "[Critical Error: Not all AI cores are online. Fusion is not possible.]"
# For now, we use all configured models. Tier logic can be re-added here.
models_to_query = list(self.models.keys())
self.models_last_queried_for_perspectives = models_to_query
logger.info(f"Initiating True Fusion for query: '{query[:60]}...' using models: {models_to_query}")
# Create and run all perspective-gathering tasks in parallel
perspective_tasks = [self._get_single_perspective(model_name, query) for model_name in models_to_query]
gathered_responses = await asyncio.gather(*perspective_tasks)
perspectives = dict(zip(models_to_query, gathered_responses))
# Generate the final synthesis based on the gathered perspectives
logger.info("All perspectives gathered. Generating final synthesis...")
synthesis = await self._generate_synthesis(query, perspectives)
# Format the final output for the user
logger.info("Synthesis complete. Formatting final output.")
return self._format_final_output(synthesis, perspectives)
async def get_status_report(self) -> Dict[str, Any]:
"""Provides a status report for the system."""
return {
"status": "Online - True Fusion Mode",
"loaded_models": [name for name, model in self.models.items() if model is not None],
"failed_models": [name for name, model in self.models.items() if model is None],
"last_queried_for_fusion": self.models_last_queried_for_perspectives,
} |