ZOTHEOS commited on
Commit
3eb16fc
Β·
verified Β·
1 Parent(s): 4d5f326

Update modules/main_fusion_public.py

Browse files
Files changed (1) hide show
  1. modules/main_fusion_public.py +87 -115
modules/main_fusion_public.py CHANGED
@@ -1,156 +1,128 @@
1
- # FILE: modules/main_fusion_public.py (True Fusion, Verified Launch Version)
2
 
3
  import asyncio
4
  import logging
 
5
  from typing import Dict, Any, Optional, List
6
 
7
- # These imports will now work correctly with the verified config file
8
  try:
9
  from modules.config_settings_public import (
10
- MODEL_PATHS,
11
- MODEL_SPECIFIC_PARAMS,
12
- INFERENCE_PRESETS,
13
- DEFAULT_INFERENCE_PRESET,
14
- MODEL_ROLES,
15
- MODEL_ROLE_SYSTEM_PROMPTS,
16
- DEFAULT_SYSTEM_PROMPT
17
  )
18
  from llama_cpp import Llama
19
  LLAMA_CPP_AVAILABLE = True
20
  except ImportError as e:
21
- # This is a critical failure, the app cannot run without these.
22
  logging.basicConfig(level=logging.CRITICAL)
23
- logging.critical(f"CRITICAL IMPORT ERROR in main_fusion_public.py: {e}. ZOTHEOS cannot function.")
24
  LLAMA_CPP_AVAILABLE = False
25
 
 
26
 
27
- logger = logging.getLogger("ZOTHEOS_MainFusion")
28
- if not logger.handlers:
29
- # Configure logger if it hasn't been configured yet
30
- handler = logging.StreamHandler()
31
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
32
- handler.setFormatter(formatter)
33
- logger.addHandler(handler)
34
- logger.setLevel(logging.INFO)
35
-
36
-
 
 
 
 
 
37
  class MainFusionPublic:
38
- """
39
- The core engine for ZOTHEOS. It loads multiple AI models, queries them in parallel
40
- with distinct persona-based prompts, and then synthesizes their responses into a
41
- higher-order summary.
42
- """
43
  def __init__(self):
44
- logger.info("πŸš€ ZOTHEOS MainFusion Initializing (True Fusion Mode)...")
45
  if not LLAMA_CPP_AVAILABLE:
46
- raise RuntimeError("Llama.cpp backend is not available. ZOTHEOS cannot start.")
47
 
48
  self.models: Dict[str, Optional[Llama]] = {}
49
- self.config = {
50
- "MODEL_ROLES": MODEL_ROLES,
51
- "MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS,
52
- }
53
- self.models_last_queried_for_perspectives: List[str] = []
54
-
55
  self._initialize_models()
56
 
57
  def _initialize_models(self):
58
- """Loads all models defined in the config into memory."""
59
- for model_name, model_path in MODEL_PATHS.items():
60
  try:
61
- logger.info(f"Loading model: {model_name}...")
62
- params = MODEL_SPECIFIC_PARAMS['_default'].copy()
63
- params.update(MODEL_SPECIFIC_PARAMS.get(model_name, {}))
64
-
65
- self.models[model_name] = Llama(model_path=model_path, **params)
66
- logger.info(f"βœ… Model '{model_name}' loaded successfully.")
67
  except Exception as e:
68
- logger.error(f"❌ Failed to load model '{model_name}': {e}", exc_info=True)
69
- self.models[model_name] = None
70
-
71
- async def _get_single_perspective(self, model_name: str, query: str) -> str:
72
- """Queries a single model with its assigned role and returns the response."""
73
- if model_name not in self.models or self.models[model_name] is None:
74
- logger.warning(f"Model '{model_name}' is not loaded or failed to initialize.")
75
- return f"[Error: The '{model_name}' AI core is offline.]"
76
 
77
- role = self.config["MODEL_ROLES"].get(model_name, "general")
78
- system_prompt = self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(role, DEFAULT_SYSTEM_PROMPT)
 
79
 
80
- llm = self.models[model_name]
81
  messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}]
82
  preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {})
83
-
84
  try:
85
- logger.info(f"Querying {model_name} ({role})...")
86
- # Run the blocking call in a separate thread to not block the event loop
87
  response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset)
88
- content = response['choices'][0]['message']['content'].strip()
89
- logger.info(f"βœ… Response received from {model_name}.")
90
- return content
91
  except Exception as e:
92
- logger.error(f"Error during inference for model '{model_name}': {e}", exc_info=True)
93
- return f"[Error during inference for '{model_name}'. See logs for details.]"
94
-
95
- async def _generate_synthesis(self, query: str, perspectives: Dict[str, str]) -> str:
96
- """Uses a designated model to synthesize a summary from all perspectives."""
97
- summarizer_model = "gemma" # Gemma is good for this task
98
- if summarizer_model not in self.models or self.models[summarizer_model] is None:
99
- logger.warning(f"Summarizer model '{summarizer_model}' not available. Returning basic summary.")
100
- return "Multiple perspectives were gathered, but the synthesis AI core is offline. Please review the detailed perspectives below."
101
-
102
- valid_perspectives = {name: text for name, text in perspectives.items() if not text.startswith("[Error")}
103
- if not valid_perspectives:
104
- return "No valid perspectives could be generated to create a summary."
105
- if len(valid_perspectives) == 1:
106
- return next(iter(valid_perspectives.values()))
107
-
108
- synthesis_prompt = f"Original User Question: \"{query}\"\n\nSynthesize the following distinct perspectives into a single, cohesive, and insightful summary. Capture the core agreements, disagreements, and unique insights from each viewpoint. Your goal is to provide a higher-order understanding that integrates these different analyses.\n\n"
109
- for name, text in valid_perspectives.items():
 
 
 
 
 
 
 
 
 
 
 
 
110
  role = self.config["MODEL_ROLES"].get(name, "General")
111
- synthesis_prompt += f"--- PERSPECTIVE FROM {name.upper()} ({role.capitalize()}) ---\n{text}\n\n"
112
- synthesis_prompt += "--- SYNTHESIZED INSIGHT ---\n"
113
-
114
- summary_system_prompt = "You are a master synthesis AI. Your role is to create a clear and insightful summary from the provided texts, acting as a final arbiter of truth."
115
- return await self._get_single_perspective(summarizer_model, synthesis_prompt)
116
-
117
- def _format_final_output(self, summary: str, perspectives: Dict[str, str]) -> str:
118
- """Formats the final Markdown output for the Gradio interface."""
119
- output = f"## ✨ ZOTHEOS Final Synthesized Insight ✨\n\n{summary}\n\n---\n\n### πŸ’¬ Detailed Individual Perspectives\n\n"
120
- for model_name, text in perspectives.items():
121
- role = self.config["MODEL_ROLES"].get(model_name, "General")
122
- output += f"**Perspective from {model_name.capitalize()} ({role.capitalize()}):**\n{text}\n\n"
123
  return output.strip()
124
 
125
- async def process_query_with_fusion(self, query: str, **kwargs):
126
- """The main entry point for processing a user query with True Fusion."""
127
- if not all(self.models.values()):
128
- return "[Critical Error: Not all AI cores are online. Fusion is not possible.]"
129
 
130
- # For now, we use all configured models. Tier logic can be re-added here.
131
- models_to_query = list(self.models.keys())
132
- self.models_last_queried_for_perspectives = models_to_query
 
133
 
134
- logger.info(f"Initiating True Fusion for query: '{query[:60]}...' using models: {models_to_query}")
135
-
136
- # Create and run all perspective-gathering tasks in parallel
137
- perspective_tasks = [self._get_single_perspective(model_name, query) for model_name in models_to_query]
138
- gathered_responses = await asyncio.gather(*perspective_tasks)
139
- perspectives = dict(zip(models_to_query, gathered_responses))
140
 
141
- # Generate the final synthesis based on the gathered perspectives
142
- logger.info("All perspectives gathered. Generating final synthesis...")
143
- synthesis = await self._generate_synthesis(query, perspectives)
144
 
145
- # Format the final output for the user
146
- logger.info("Synthesis complete. Formatting final output.")
147
- return self._format_final_output(synthesis, perspectives)
148
 
149
  async def get_status_report(self) -> Dict[str, Any]:
150
- """Provides a status report for the system."""
151
- return {
152
- "status": "Online - True Fusion Mode",
153
- "loaded_models": [name for name, model in self.models.items() if model is not None],
154
- "failed_models": [name for name, model in self.models.items() if model is None],
155
- "last_queried_for_fusion": self.models_last_queried_for_perspectives,
156
- }
 
1
+ # FILE: modules/main_fusion_public.py (Hugging Face Demo - AGI-Tier Upgrade)
2
 
3
  import asyncio
4
  import logging
5
+ import json
6
  from typing import Dict, Any, Optional, List
7
 
 
8
  try:
9
  from modules.config_settings_public import (
10
+ MODEL_PATHS, MODEL_SPECIFIC_PARAMS, INFERENCE_PRESETS,
11
+ DEFAULT_INFERENCE_PRESET, MODEL_ROLES, MODEL_ROLE_SYSTEM_PROMPTS, DEFAULT_SYSTEM_PROMPT
 
 
 
 
 
12
  )
13
  from llama_cpp import Llama
14
  LLAMA_CPP_AVAILABLE = True
15
  except ImportError as e:
 
16
  logging.basicConfig(level=logging.CRITICAL)
17
+ logging.critical(f"CRITICAL HF IMPORT ERROR: {e}.")
18
  LLAMA_CPP_AVAILABLE = False
19
 
20
+ logger = logging.getLogger("ZOTHEOS_MainFusion_HF")
21
 
22
+ # --- βœ… Simple In-Memory MemoryBank for the Web Demo ---
23
+ class MemoryBank:
24
+ """A simple, non-persistent memory bank for the web demo."""
25
+ def __init__(self):
26
+ self.memories: List[Dict[str, Any]] = []
27
+ async def store_memory_async(self, query: str, response: str, metadata: Optional[Dict[str, Any]] = None):
28
+ entry = {'query': query, 'response': response, 'metadata': metadata or {}}
29
+ self.memories.append(entry)
30
+ if len(self.memories) > 10: self.memories.pop(0) # Keep memory from growing too large
31
+ async def retrieve_recent_memories_async(self, limit: int = 5) -> List[Dict[str, Any]]:
32
+ return self.memories[-limit:]
33
+ async def get_all_memories_for_export_async(self) -> List[Dict[str, Any]]:
34
+ return list(self.memories) # Return a copy
35
+
36
+ # --- βœ… The AGI-Tier Engine, Optimized for Web ---
37
  class MainFusionPublic:
 
 
 
 
 
38
  def __init__(self):
39
+ logger.info("πŸš€ ZOTHEOS HF Demo Engine Initializing...")
40
  if not LLAMA_CPP_AVAILABLE:
41
+ raise RuntimeError("Llama.cpp backend is not available.")
42
 
43
  self.models: Dict[str, Optional[Llama]] = {}
44
+ self.config = {"MODEL_ROLES": MODEL_ROLES, "MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS}
45
+ self.models_last_queried: List[str] = []
46
+ self.memory_bank = MemoryBank()
 
 
 
47
  self._initialize_models()
48
 
49
  def _initialize_models(self):
50
+ for name, path in MODEL_PATHS.items():
 
51
  try:
52
+ logger.info(f"HF Demo: Loading model {name}...")
53
+ params = MODEL_SPECIFIC_PARAMS.get('_default', {}).copy()
54
+ params.update(MODEL_SPECIFIC_PARAMS.get(name, {}))
55
+ self.models[name] = Llama(model_path=path, **params)
56
+ logger.info(f"βœ… HF Demo: Model '{name}' loaded.")
 
57
  except Exception as e:
58
+ logger.error(f"❌ HF Demo: Failed to load model '{name}': {e}", exc_info=True)
59
+ self.models[name] = None
 
 
 
 
 
 
60
 
61
+ async def _get_perspective(self, model_name: str, query: str, system_prompt: str) -> str:
62
+ llm = self.models.get(model_name)
63
+ if not llm: return f"[Error: '{model_name}' core is offline.]"
64
 
 
65
  messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}]
66
  preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {})
67
+
68
  try:
 
 
69
  response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset)
70
+ return response['choices'][0]['message']['content'].strip()
 
 
71
  except Exception as e:
72
+ logger.error(f"HF Demo Inference error for '{model_name}': {e}")
73
+ return f"[Error during inference for '{model_name}'.]"
74
+
75
+ async def _generate_true_synthesis(self, query: str, perspectives: Dict[str, str]) -> str:
76
+ summarizer_model = "gemma"
77
+ if summarizer_model not in self.models or not self.models[summarizer_model]:
78
+ return "Synthesis AI core is offline."
79
+
80
+ valid_perspectives = {k: v for k, v in perspectives.items() if not v.startswith("[Error")}
81
+ if not valid_perspectives: return "No valid perspectives were generated."
82
+
83
+ synthesis_prompt = f"""
84
+ As the master intelligence of ZOTHEOS, perform a high-level synthesis of these perspectives on the user's query: "{query}".
85
+ Your goal is to find the deeper truth. Analyze the viewpoints to:
86
+ 1. Identify the core theme.
87
+ 2. Highlight the most significant tension between them.
88
+ 3. Extract a unique "aha!" insight that emerges from their combination.
89
+ 4. Conclude with a definitive, balanced statement.
90
+
91
+ Perspectives:
92
+ {json.dumps(valid_perspectives, indent=2)}
93
+
94
+ Your Final Synthesized Insight:
95
+ """
96
+ system_prompt = "You are a master synthesis AI. Create a clear, insightful summary from the provided texts."
97
+ return await self._get_perspective(summarizer_model, synthesis_prompt, system_prompt)
98
+
99
+ def _format_output(self, summary: str, perspectives: Dict[str, str]) -> str:
100
+ output = f"## ✨ ZOTHEOS Final Synthesized Insight ✨\n\n{summary}\n\n### πŸ’¬ Detailed Individual Perspectives\n\n"
101
+ for name, text in perspectives.items():
102
  role = self.config["MODEL_ROLES"].get(name, "General")
103
+ output += f"**Perspective from {name.capitalize()} ({role.capitalize()}):**\n{text}\n\n"
 
 
 
 
 
 
 
 
 
 
 
104
  return output.strip()
105
 
106
+ async def process_query_with_fusion(self, query: str, **kwargs) -> str:
107
+ online_models = [name for name, model in self.models.items() if model]
108
+ if not online_models: return "[Critical Error: All AI cores are offline.]"
109
+ self.models_last_queried = online_models
110
 
111
+ tasks = {name: self._get_perspective(
112
+ name, query,
113
+ self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(self.config["MODEL_ROLES"].get(name, "general"), DEFAULT_SYSTEM_PROMPT)
114
+ ) for name in online_models}
115
 
116
+ responses = await asyncio.gather(*tasks.values())
117
+ perspectives = dict(zip(tasks.keys(), responses))
118
+
119
+ synthesis = await self._generate_true_synthesis(query, perspectives)
120
+ final_output = self._format_output(synthesis, perspectives)
 
121
 
122
+ if self.memory_bank:
123
+ await self.memory_bank.store_memory_async(query=query, response=final_output)
 
124
 
125
+ return final_output
 
 
126
 
127
  async def get_status_report(self) -> Dict[str, Any]:
128
+ return {"status": "Online - Web Demo Mode", "loaded_models": [name for name, model in self.models.items() if model is not None]}