File size: 8,341 Bytes
ed8a3b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# FILE: modules/main_fusion_public.py (True Fusion, Verified Launch Version)

import asyncio
import logging
from typing import Dict, Any, Optional, List

# These imports will now work correctly with the verified config file
try:
    from modules.config_settings_public import (
        MODEL_PATHS,
        MODEL_SPECIFIC_PARAMS,
        INFERENCE_PRESETS,
        DEFAULT_INFERENCE_PRESET,
        MODEL_ROLES,
        MODEL_ROLE_SYSTEM_PROMPTS,
        DEFAULT_SYSTEM_PROMPT
    )
    from llama_cpp import Llama
    LLAMA_CPP_AVAILABLE = True
except ImportError as e:
    # This is a critical failure, the app cannot run without these.
    logging.basicConfig(level=logging.CRITICAL)
    logging.critical(f"CRITICAL IMPORT ERROR in main_fusion_public.py: {e}. ZOTHEOS cannot function.")
    LLAMA_CPP_AVAILABLE = False


logger = logging.getLogger("ZOTHEOS_MainFusion")
if not logger.handlers:
    # Configure logger if it hasn't been configured yet
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)


class MainFusionPublic:
    """
    The core engine for ZOTHEOS. It loads multiple AI models, queries them in parallel
    with distinct persona-based prompts, and then synthesizes their responses into a
    higher-order summary.
    """
    def __init__(self):
        logger.info("πŸš€ ZOTHEOS MainFusion Initializing (True Fusion Mode)...")
        if not LLAMA_CPP_AVAILABLE:
            raise RuntimeError("Llama.cpp backend is not available. ZOTHEOS cannot start.")

        self.models: Dict[str, Optional[Llama]] = {}
        self.config = {
            "MODEL_ROLES": MODEL_ROLES,
            "MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS,
        }
        self.models_last_queried_for_perspectives: List[str] = []

        self._initialize_models()

    def _initialize_models(self):
        """Loads all models defined in the config into memory."""
        for model_name, model_path in MODEL_PATHS.items():
            try:
                logger.info(f"Loading model: {model_name}...")
                params = MODEL_SPECIFIC_PARAMS['_default'].copy()
                params.update(MODEL_SPECIFIC_PARAMS.get(model_name, {}))
                
                self.models[model_name] = Llama(model_path=model_path, **params)
                logger.info(f"βœ… Model '{model_name}' loaded successfully.")
            except Exception as e:
                logger.error(f"❌ Failed to load model '{model_name}': {e}", exc_info=True)
                self.models[model_name] = None

    async def _get_single_perspective(self, model_name: str, query: str) -> str:
        """Queries a single model with its assigned role and returns the response."""
        if model_name not in self.models or self.models[model_name] is None:
            logger.warning(f"Model '{model_name}' is not loaded or failed to initialize.")
            return f"[Error: The '{model_name}' AI core is offline.]"

        role = self.config["MODEL_ROLES"].get(model_name, "general")
        system_prompt = self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(role, DEFAULT_SYSTEM_PROMPT)
        
        llm = self.models[model_name]
        messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}]
        preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {})

        try:
            logger.info(f"Querying {model_name} ({role})...")
            # Run the blocking call in a separate thread to not block the event loop
            response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset)
            content = response['choices'][0]['message']['content'].strip()
            logger.info(f"βœ… Response received from {model_name}.")
            return content
        except Exception as e:
            logger.error(f"Error during inference for model '{model_name}': {e}", exc_info=True)
            return f"[Error during inference for '{model_name}'. See logs for details.]"

    async def _generate_synthesis(self, query: str, perspectives: Dict[str, str]) -> str:
        """Uses a designated model to synthesize a summary from all perspectives."""
        summarizer_model = "gemma"  # Gemma is good for this task
        if summarizer_model not in self.models or self.models[summarizer_model] is None:
            logger.warning(f"Summarizer model '{summarizer_model}' not available. Returning basic summary.")
            return "Multiple perspectives were gathered, but the synthesis AI core is offline. Please review the detailed perspectives below."

        valid_perspectives = {name: text for name, text in perspectives.items() if not text.startswith("[Error")}
        if not valid_perspectives:
            return "No valid perspectives could be generated to create a summary."
        if len(valid_perspectives) == 1:
            return next(iter(valid_perspectives.values()))

        synthesis_prompt = f"Original User Question: \"{query}\"\n\nSynthesize the following distinct perspectives into a single, cohesive, and insightful summary. Capture the core agreements, disagreements, and unique insights from each viewpoint. Your goal is to provide a higher-order understanding that integrates these different analyses.\n\n"
        for name, text in valid_perspectives.items():
            role = self.config["MODEL_ROLES"].get(name, "General")
            synthesis_prompt += f"--- PERSPECTIVE FROM {name.upper()} ({role.capitalize()}) ---\n{text}\n\n"
        synthesis_prompt += "--- SYNTHESIZED INSIGHT ---\n"
        
        summary_system_prompt = "You are a master synthesis AI. Your role is to create a clear and insightful summary from the provided texts, acting as a final arbiter of truth."
        return await self._get_single_perspective(summarizer_model, synthesis_prompt)

    def _format_final_output(self, summary: str, perspectives: Dict[str, str]) -> str:
        """Formats the final Markdown output for the Gradio interface."""
        output = f"## ✨ ZOTHEOS Final Synthesized Insight ✨\n\n{summary}\n\n---\n\n### πŸ’¬ Detailed Individual Perspectives\n\n"
        for model_name, text in perspectives.items():
            role = self.config["MODEL_ROLES"].get(model_name, "General")
            output += f"**Perspective from {model_name.capitalize()} ({role.capitalize()}):**\n{text}\n\n"
        return output.strip()

    async def process_query_with_fusion(self, query: str, **kwargs):
        """The main entry point for processing a user query with True Fusion."""
        if not all(self.models.values()):
            return "[Critical Error: Not all AI cores are online. Fusion is not possible.]"

        # For now, we use all configured models. Tier logic can be re-added here.
        models_to_query = list(self.models.keys())
        self.models_last_queried_for_perspectives = models_to_query
        
        logger.info(f"Initiating True Fusion for query: '{query[:60]}...' using models: {models_to_query}")
        
        # Create and run all perspective-gathering tasks in parallel
        perspective_tasks = [self._get_single_perspective(model_name, query) for model_name in models_to_query]
        gathered_responses = await asyncio.gather(*perspective_tasks)
        perspectives = dict(zip(models_to_query, gathered_responses))

        # Generate the final synthesis based on the gathered perspectives
        logger.info("All perspectives gathered. Generating final synthesis...")
        synthesis = await self._generate_synthesis(query, perspectives)

        # Format the final output for the user
        logger.info("Synthesis complete. Formatting final output.")
        return self._format_final_output(synthesis, perspectives)

    async def get_status_report(self) -> Dict[str, Any]:
        """Provides a status report for the system."""
        return {
            "status": "Online - True Fusion Mode",
            "loaded_models": [name for name, model in self.models.items() if model is not None],
            "failed_models": [name for name, model in self.models.items() if model is None],
            "last_queried_for_fusion": self.models_last_queried_for_perspectives,
        }