ZOTHEOS commited on
Commit
ed8a3b2
·
verified ·
1 Parent(s): 9f05f25

Update modules/main_fusion_public.py

Browse files
Files changed (1) hide show
  1. modules/main_fusion_public.py +156 -379
modules/main_fusion_public.py CHANGED
@@ -1,379 +1,156 @@
1
- # FILE: modules/main_fusion_public.py (Finalized with Tier Logic, Async, Fusion Summary)
2
-
3
- import asyncio
4
- import logging
5
- import os
6
- import sys
7
- import time
8
- import json
9
- from typing import Dict, Any, Optional, List, Union
10
-
11
- try:
12
- # Correctly determine project_root assuming this file is in 'modules'
13
- script_dir = os.path.dirname(os.path.abspath(__file__))
14
- project_root = os.path.dirname(script_dir)
15
- if project_root not in sys.path:
16
- sys.path.insert(0, project_root)
17
- except Exception as e_path:
18
- # Basic logging if path setup fails, though critical
19
- logging.basicConfig(level=logging.ERROR)
20
- logging.error(f"Error setting up sys.path in main_fusion_public.py: {e_path}")
21
-
22
- try:
23
- from modules.config_settings_public import (
24
- MODEL_PATHS, MAX_CONCURRENT_MODELS, MAX_RAM_MODELS_GB, # Used by ModelManager init
25
- DEFAULT_SYSTEM_PROMPT, SYSTEM_PERSONAS, INFERENCE_PRESETS, DEFAULT_INFERENCE_PRESET,
26
- MODEL_ROLES, MODEL_ROLE_SYSTEM_PROMPTS
27
- )
28
- from modules.model_manager_public import ModelManager
29
- from modules.memory_bank_public import MemoryBank
30
- from modules.user_auth import get_user_tier
31
- except ImportError as e:
32
- logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
33
- logging.critical(f"CRITICAL IMPORT ERROR in main_fusion_public.py: {e}. ZOTHEOS may not function.", exc_info=True)
34
- # Provide fallbacks for critical components if possible, or allow failure
35
- # For now, if these fail, the __init__ will likely raise an error or log critical status.
36
- # Making the application fail loudly is often better than silent dysfunction.
37
- # Consider exiting if critical imports fail: sys.exit(f"Fatal Import Error in main_fusion_public.py: {e}")
38
-
39
- # --- Start of FIX: Define LLAMA_CPP_AVAILABLE ---
40
- try:
41
- # Attempt to import the core Llama class from llama_cpp
42
- from llama_cpp import Llama # You might also need LlamaCppError if you use it
43
- LLAMA_CPP_AVAILABLE = True
44
- # print("DEBUG: llama_cpp imported successfully, LLAMA_CPP_AVAILABLE=True") # Optional debug print
45
- except ImportError:
46
- LLAMA_CPP_AVAILABLE = False
47
- # print("DEBUG: llama_cpp import failed, LLAMA_CPP_AVAILABLE=False") # Optional debug print
48
- except Exception as e_llama_import: # Catch other potential errors during import
49
- LLAMA_CPP_AVAILABLE = False
50
- # print(f"DEBUG: An unexpected error occurred during llama_cpp import: {e_llama_import}, LLAMA_CPP_AVAILABLE=False") # Optional debug print
51
- # --- End of FIX ---
52
-
53
- logger = logging.getLogger("ZOTHEOS_MainFusion")
54
- if not logger.handlers:
55
- handler = logging.StreamHandler(sys.stdout)
56
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
57
- handler.setFormatter(formatter)
58
- logger.addHandler(handler)
59
- logger.setLevel(logging.INFO)
60
-
61
-
62
- class MainFusionPublic:
63
- def __init__(self, device_preference: Optional[str] = "cuda"):
64
- logger.info("🚀 ZOTHEOS MainFusion Initializing (Tier Logic, Async Fusion Summary Enabled)...")
65
-
66
- self.config = {
67
- "MODEL_ROLES": MODEL_ROLES,
68
- "MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS,
69
- "DEFAULT_SYSTEM_PROMPT": DEFAULT_SYSTEM_PROMPT,
70
- "TIER_CONFIG": {
71
- "free": {"model_limit": 2, "memory_enabled": False, "display_name": "Free Tier"},
72
- "starter": {"model_limit": 3, "memory_enabled": True, "display_name": "Starter Tier"},
73
- # Pro uses max models from MODEL_PATHS, ensuring it uses all available configured models
74
- "pro": {"model_limit": len(MODEL_PATHS.keys()) if MODEL_PATHS else 3, "memory_enabled": True, "display_name": "Pro Tier"}
75
- }
76
- }
77
- self.models_last_queried_for_perspectives: List[str] = []
78
-
79
- try:
80
- # Pass global config values for ModelManager initialization
81
- logger.info(f"Initializing ModelManager with device_preference='{device_preference}', max_count={MAX_CONCURRENT_MODELS}, max_ram_gb={MAX_RAM_MODELS_GB}...")
82
- self.model_manager = ModelManager(
83
- device_preference=device_preference,
84
- max_model_count=MAX_CONCURRENT_MODELS,
85
- max_ram_models_gb=MAX_RAM_MODELS_GB
86
- )
87
- # active_model_names_in_order is the master list of all models ZOTHEOS *could* use
88
- self.active_model_names_in_order: List[str] = list(MODEL_PATHS.keys()) if MODEL_PATHS else []
89
- logger.info(f"✅ MainFusion initialized. Max available models for fusion: {self.active_model_names_in_order}")
90
- except Exception as e_mm_init:
91
- logger.critical(f"❌ CRITICAL: ModelManager failed to initialize in MainFusion: {e_mm_init}", exc_info=True)
92
- self.model_manager = None
93
- self.active_model_names_in_order = []
94
-
95
- try:
96
- self.memory_bank = MemoryBank()
97
- logger.info(" MemoryBank initialized.")
98
- except Exception as e_mb_init:
99
- logger.error(f" MemoryBank failed to initialize: {e_mb_init}. Interactions may not be stored.", exc_info=True)
100
- self.memory_bank = None
101
-
102
- if not self.model_manager or not LLAMA_CPP_AVAILABLE: # Check if ModelManager itself or Llama backend failed
103
- logger.critical("MainFusion started in a DEGRADED state: ModelManager or Llama.cpp backend is UNAVAILABLE.")
104
-
105
-
106
- async def _get_single_model_response_direct(self, model_name: str, user_query: str, system_prompt_for_call: str, preset_name_for_call: str) -> Dict[str, Any]:
107
- response_text = f"[Error: Model '{model_name}' generation did not complete or model unavailable]"
108
- start_time_model = time.perf_counter()
109
- status = "Model Error or Unavailable"
110
-
111
- if not self.model_manager:
112
- logger.error(f"[{model_name}] ModelManager not available for generation.")
113
- return {"model": model_name, "text": "[Error: ModelManager is offline]", "time_ms": 0, "status": "ModelManager Offline"}
114
-
115
- try:
116
- logger.info(f"⚙️ [{model_name}] Calling ModelManager.generate_with_model. System Prompt: '{system_prompt_for_call[:50]}...'. User Query: '{user_query[:50].replace(chr(10),' ')}...'")
117
- # generate_with_model handles actual call to Llama instance via async_generation_wrapper
118
- response_text = await self.model_manager.generate_with_model(
119
- model_name=model_name,
120
- prompt=user_query,
121
- preset_name=preset_name_for_call,
122
- system_prompt=system_prompt_for_call
123
- )
124
- preview = response_text[:100].replace("\n", " ")
125
- if response_text.startswith(("[Error:", "[No text generated", "[Malformed response")) or not response_text.strip():
126
- logger.warning(f"⚠️ [{model_name}] Model returned an error string or empty content: {preview}...")
127
- status = "Model Error or Empty Response"
128
- else:
129
- logger.info(f"✅ [{model_name}] Response received ({len(response_text)} chars): {preview}...")
130
- status = "Success"
131
- except Exception as e:
132
- logger.error(f"❌ [{model_name}] Exception in _get_single_model_response_direct: {e}", exc_info=True)
133
- response_text = f"[Error: MainFusion encountered an exception during {model_name} generation: {type(e).__name__}]"
134
- status = "MainFusion Exception"
135
- inference_time_ms = (time.perf_counter() - start_time_model) * 1000
136
- return {"model": model_name, "text": response_text.strip(), "time_ms": round(inference_time_ms, 2), "status": status}
137
-
138
- async def _generate_true_fusion_summary(self, raw_responses_dict: Dict[str, str], original_query: str, models_actually_used: List[str]) -> str:
139
- logger.info(f"Attempting to generate a true fusion summary from {len(models_actually_used)} perspective(s).")
140
- summarizer_model_key = "gemma" # Default summarizer, ensure it's in MODEL_PATHS and loadable
141
-
142
- valid_responses_for_summary = {
143
- model: text for model, text in raw_responses_dict.items()
144
- if model in models_actually_used and text and not text.startswith("[Error:")
145
- }
146
-
147
- if not valid_responses_for_summary: # No valid responses at all
148
- logger.warning("No valid responses available to generate a fusion summary.")
149
- return "A summary could not be generated as no valid perspectives were successfully gathered."
150
-
151
- if len(valid_responses_for_summary) == 1:
152
- single_model_name = list(valid_responses_for_summary.keys())[0]
153
- logger.info(f"Only one valid perspective from {single_model_name}. Using it as the summary.")
154
- return f"Based on the single available perspective from **{single_model_name.capitalize()}**:\n\n{list(valid_responses_for_summary.values())[0]}"
155
-
156
- fusion_prompt = f"Original User Question: \"{original_query}\"\n\n"
157
- fusion_prompt += "You are the Fusion Summary engine for ZOTHEOS. Your task is to read the multiple perspectives provided below and write a concise, balanced, and synthesized summary. Capture the most important points, common themes, and notable differences from all viewpoints. Your tone should be thoughtful, neutral, and respectful. Structure the summary clearly.\n\n"
158
-
159
- for model_name, text in valid_responses_for_summary.items():
160
- role = self.config["MODEL_ROLES"].get(model_name, "General")
161
- fusion_prompt += f"--- PERSPECTIVE FROM {model_name.upper()} ({role.capitalize()}) ---\n{text}\n\n"
162
-
163
- fusion_prompt += "--- SYNTHESIZED SUMMARY (Combine the perspectives above into a unified insight) ---\n"
164
-
165
- logger.info(f"Calling summarizer model '{summarizer_model_key}' (from available: {self.active_model_names_in_order}) for fusion summary.")
166
-
167
- if not self.model_manager:
168
- logger.error("ModelManager not available for generating fusion summary.")
169
- return "[Error: Summarizer service unavailable - ModelManager offline]"
170
-
171
- try:
172
- # The system prompt for the summarizer itself
173
- summarizer_system_prompt = "You are an expert synthesis AI. Your role is to create a coherent and insightful summary from the provided texts."
174
-
175
- summary_text = await self.model_manager.generate_with_model(
176
- model_name=summarizer_model_key,
177
- prompt=fusion_prompt, # The full context with all perspectives
178
- preset_name="precise", # Use a precise preset for summarization
179
- system_prompt=summarizer_system_prompt
180
- )
181
- if summary_text and not summary_text.startswith("[Error:"):
182
- logger.info("✅ True fusion summary generated successfully.")
183
- return summary_text.strip()
184
- else:
185
- logger.warning(f"Summarizer model '{summarizer_model_key}' returned an error or empty response: {summary_text}")
186
- return "[Warning: Summary generation was partial or failed. Displaying raw perspectives.]"
187
- except Exception as e:
188
- logger.error(f"❌ Exception while generating true fusion summary with '{summarizer_model_key}': {e}", exc_info=True)
189
- return f"[Error: The summary generation process failed. Exception: {type(e).__name__}]"
190
-
191
- def _analyze_responses_basic(self, responses_dict: Dict[str, str], model_roles: Dict[str, str]) -> Dict[str, Any]:
192
- valid_responses = {model: text for model, text in responses_dict.items() if text and not text.startswith("[Error:")}
193
- consensus_points = []
194
- if len(valid_responses) > 1: consensus_points.append("Multiple perspectives were gathered and synthesized.")
195
- elif len(valid_responses) == 1: consensus_points.append("A single primary perspective was available for synthesis.")
196
- else: consensus_points.append("No valid primary perspectives were available for synthesis.")
197
- return {"consensus_points": consensus_points, "contradictions": [], "unique_insights": valid_responses}
198
-
199
- def _synthesize_fusion_response(self, analysis_result: dict, model_roles: dict, raw_responses_dict: dict, final_summary_text: str, models_used_for_perspectives: List[str]) -> str:
200
- response_parts = []
201
-
202
- response_parts.append("## ✨ ZOTHEOS Final Synthesized Insight ✨")
203
- response_parts.append(final_summary_text if final_summary_text and not final_summary_text.startswith(("[Error:", "[Warning:")) else "*Synthesis process encountered an issue or no summary was generated. Please see detailed perspectives below.*")
204
- response_parts.append("\n---\n")
205
-
206
- response_parts.append("### 💬 Detailed Individual Perspectives")
207
- has_any_valid_perspectives = False
208
- for model_name in models_used_for_perspectives:
209
- text = raw_responses_dict.get(model_name)
210
- role = model_roles.get(model_name, "General") # Default role
211
- response_parts.append(f"**Perspective from {model_name.capitalize()} ({role.capitalize()}):**")
212
- if text and not text.startswith("[Error:"):
213
- response_parts.append(text.strip())
214
- has_any_valid_perspectives = True
215
- else:
216
- response_parts.append(f"*{text if text else '[No response or error from this model.]'}*") # Display error or placeholder
217
- response_parts.append("")
218
- if not has_any_valid_perspectives and not (final_summary_text and not final_summary_text.startswith(("[Error:", "[Warning:"))):
219
- # If summary also failed/empty and no valid individual perspectives.
220
- response_parts = ["## ⚠️ ZOTHEOS Alert\n\nUnfortunately, ZOTHEOS encountered issues processing your query with all available AI cores for your tier. No insights could be gathered at this time. Please try rephrasing your query or try again later."]
221
- elif not has_any_valid_perspectives : # Summary might be there, but no individual details.
222
- response_parts.append("*No valid individual perspectives were successfully retrieved to display in detail.*")
223
-
224
-
225
- return "\n".join(response_parts).strip()
226
-
227
- async def process_query_with_fusion(
228
- self,
229
- query: str,
230
- user_token: Optional[str] = None,
231
- persona_key: Optional[str] = None,
232
- fusion_mode_override: str = "balanced",
233
- **kwargs
234
- ) -> str:
235
- process_start_time = time.time()
236
- current_tier_name = get_user_tier(user_token if user_token else "")
237
- tier_settings = self.config["TIER_CONFIG"].get(current_tier_name, self.config["TIER_CONFIG"]["free"])
238
- tier_model_limit = tier_settings["model_limit"]
239
- tier_memory_enabled = tier_settings["memory_enabled"]
240
- logger.info(f"User Tier: '{current_tier_name}' ({tier_settings['display_name']}). Model Limit: {tier_model_limit}, Memory: {'Enabled' if tier_memory_enabled else 'Disabled'}.")
241
-
242
- if not self.model_manager or not LLAMA_CPP_AVAILABLE: return "[Error: ZOTHEOS Core Model Manager not ready or Llama.cpp backend unavailable.]"
243
- if not self.active_model_names_in_order: return "[Error: ZOTHEOS Core not ready. No models configured in MODEL_PATHS.]"
244
- if not query or not query.strip(): return "[Error: Query is empty. Please provide a question or topic.]"
245
-
246
- current_query_text = query
247
- current_preset_name = fusion_mode_override if fusion_mode_override in INFERENCE_PRESETS else DEFAULT_INFERENCE_PRESET
248
- base_persona_prompt = SYSTEM_PERSONAS.get(persona_key or "default", self.config["DEFAULT_SYSTEM_PROMPT"])
249
-
250
- # Determine actual models to use based on tier limit and availability
251
- models_to_use_for_perspectives = [m for m in self.active_model_names_in_order if m in MODEL_PATHS][:tier_model_limit]
252
- self.models_last_queried_for_perspectives = models_to_use_for_perspectives # For status report
253
-
254
- if not models_to_use_for_perspectives:
255
- logger.error(f"No models available for tier '{current_tier_name}' after applying limit of {tier_model_limit}.")
256
- return f"[Error: No models available for your current tier ('{current_tier_name}').]"
257
-
258
- logger.info(f"🔎 Processing query. Models for perspectives (Tier: {current_tier_name}): {models_to_use_for_perspectives}. Preset: '{current_preset_name}'. Query: '{current_query_text[:60].replace(chr(10),' ')}...'")
259
-
260
- raw_responses_dict: Dict[str, str] = {}
261
- individual_results_for_memory: List[Dict[str, Any]] = []
262
- successful_responses = 0
263
-
264
- for model_name in models_to_use_for_perspectives:
265
- model_role = self.config["MODEL_ROLES"].get(model_name, "general")
266
- system_prompt_for_model = self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(model_role, base_persona_prompt)
267
- query_for_this_model = current_query_text
268
- if model_name.lower() == "gemma" and system_prompt_for_model:
269
- query_for_this_model = f"<start_of_turn>user\n{system_prompt_for_model.strip()}\n{current_query_text}<end_of_turn>\n<start_of_turn>model\n"
270
- system_prompt_for_model = ""
271
- model_output_data = await self._get_single_model_response_direct(model_name, query_for_this_model, system_prompt_for_model, current_preset_name)
272
- individual_results_for_memory.append(model_output_data)
273
- raw_responses_dict[model_name] = model_output_data.get("text", "[Error: No text field in response data]")
274
- if model_output_data.get("status") == "Success":
275
- successful_responses += 1
276
-
277
- synthesized_summary_text = await self._generate_true_fusion_summary(raw_responses_dict, current_query_text, models_to_use_for_perspectives)
278
- analysis_result = self._analyze_responses_basic(raw_responses_dict, self.config["MODEL_ROLES"]) # Basic analysis for now
279
- final_fused_output_content = self._synthesize_fusion_response(analysis_result, self.config["MODEL_ROLES"], raw_responses_dict, synthesized_summary_text, models_to_use_for_perspectives)
280
-
281
- persona_display = (persona_key or "default").capitalize()
282
- mode_display = current_preset_name.capitalize()
283
- tier_display_name = tier_settings.get("display_name", current_tier_name.capitalize())
284
- final_header = f"## 🧠 ZOTHEOS Fused Perspectives 🧠\n*(Fusion Mode: {mode_display} | Persona: {persona_display} | Tier: {tier_display_name})*\n\n"
285
- final_fused_output = final_header + final_fused_output_content
286
-
287
- if successful_responses == 0 and not "[Error:" in final_fused_output_content and not "[Warning:" in final_fused_output_content:
288
- logger.error(f"All models ({len(models_to_use_for_perspectives)}) failed for tier '{current_tier_name}'.")
289
- final_fused_output = final_header + "[Critical Error: ZOTHEOS was unable to obtain any valid responses from its AI cores for this query.]\n\n" + final_fused_output_content.split("\n\n",1)[-1]
290
-
291
- if tier_memory_enabled:
292
- if self.memory_bank:
293
- try:
294
- memory_metadata = {
295
- "user_token_used_prefix": user_token[:3] + "***" if user_token and len(user_token) > 3 else "N/A (No Token)" if not user_token else user_token,
296
- "tier_at_interaction": current_tier_name,
297
- "persona_key": persona_key or "default", "fusion_mode_used": current_preset_name,
298
- # timestamp_iso is now added within store_memory_async
299
- "duration_seconds": round(time.time() - process_start_time, 3),
300
- "active_models_queried": models_to_use_for_perspectives, # Models actually used for perspectives
301
- "individual_model_outputs": individual_results_for_memory, # Detailed dicts
302
- "synthesized_summary_text": synthesized_summary_text, # The AI-generated summary
303
- "fused_response_length_chars": len(final_fused_output),
304
- "successful_model_responses": successful_responses,
305
- "total_models_queried": len(models_to_use_for_perspectives)
306
- }
307
- await self.memory_bank.store_memory_async(query=current_query_text, response=final_fused_output, metadata=memory_metadata)
308
- except Exception as e_mem: logger.error(f"Failed to store fusion interaction in MemoryBank (Tier: '{current_tier_name}'): {e_mem}", exc_info=True)
309
- else: logger.warning(f"MemoryBank not initialized. Skipping storage (Tier: '{current_tier_name}').")
310
- else: logger.info(f"Memory storage disabled for tier '{current_tier_name}'. Skipping storage.")
311
-
312
- total_processing_time = round(time.time() - process_start_time, 2)
313
- logger.info(f"🧠 Fusion complete in {total_processing_time}s. Output len: {len(final_fused_output)}. Models used: {len(models_to_use_for_perspectives)} (Tier: {current_tier_name}).")
314
- return final_fused_output
315
-
316
- async def get_status_report(self) -> Dict[str, Any]:
317
- report: Dict[str, Any] = {
318
- "status": "Full Multi-Model Fusion Mode Active",
319
- "fusion_engine_status": "Online" if self.model_manager and self.active_model_names_in_order and LLAMA_CPP_AVAILABLE else "Degraded/Offline",
320
- "all_available_models": self.active_model_names_in_order,
321
- "models_last_queried_for_perspectives": getattr(self, 'models_last_queried_for_perspectives', []),
322
- "model_manager_status": "Online" if self.model_manager else "Offline/Init Failed",
323
- "llama_cpp_backend_available": LLAMA_CPP_AVAILABLE,
324
- "memory_bank_status": "Online" if self.memory_bank else "Offline/Init Failed"
325
- }
326
- if self.model_manager and hasattr(self.model_manager, 'get_loaded_model_stats'):
327
- try: report["model_manager_runtime_stats"] = self.model_manager.get_loaded_model_stats()
328
- except Exception as e: report["model_manager_runtime_stats"] = f"Error getting MM stats: {e}"
329
- else: report["model_manager_runtime_stats"] = "ModelManager N/A for stats."
330
-
331
- if self.memory_bank and hasattr(self.memory_bank, 'get_memory_stats'):
332
- try: report["memory_bank_stats"] = await self.memory_bank.get_memory_stats()
333
- except Exception as e: report["memory_bank_stats"] = f"Error getting MB stats: {e}"
334
- else: report["memory_bank_stats"] = "MemoryBank N/A for stats."
335
- return report
336
-
337
- if __name__ == "__main__":
338
- if os.getenv("ZOTHEOS_DEBUG_DEPS", "false").lower() != "true":
339
- for lib_logger_name in ["torch", "huggingface_hub", "psutil", "llama_cpp", "httpx", "PIL"]: logging.getLogger(lib_logger_name).setLevel(logging.WARNING)
340
- logger.setLevel(logging.DEBUG)
341
- logger.info("--- MainFusionPublic (Tier Logic & Async Summary) CLI Test ---")
342
- async def run_main_fusion_cli_test_with_token(test_token=None, token_desc="Default (Free Tier)"):
343
- main_fusion_instance: Optional[MainFusionPublic] = None
344
- logger.info(f"\n--- Testing with token: '{token_desc}' ---")
345
- try:
346
- main_fusion_instance = MainFusionPublic(device_preference="cuda")
347
- if not main_fusion_instance.model_manager or not main_fusion_instance.active_model_names_in_order or not LLAMA_CPP_AVAILABLE:
348
- logger.critical("CLI Test Aborted: MainFusion init failed (MM or LlamaCPP unavailable)."); return
349
- test_query = "What are the core principles of Stoicism and how can they be applied in modern life?"
350
- logger.info(f"CLI Test: Querying (Token: {test_token[:3] + '...' if test_token and len(test_token)>3 else test_token}): '{test_query}'")
351
- response = await main_fusion_instance.process_query_with_fusion(query=test_query, user_token=test_token, persona_key="philosopher", fusion_mode_override="balanced")
352
- print("\n" + "="*25 + f" CLI Test Response ({token_desc}) " + "="*25); print(response); print("="* (50 + len(f" CLI Test Response ({token_desc}) ") + 2))
353
- status = await main_fusion_instance.get_status_report()
354
- print("\nSystem Status Report After Query:"); print(json.dumps(status, indent=2, default=str))
355
- except Exception as e: logger.critical(f"Error during CLI test ({token_desc}): {e}", exc_info=True); print(f"🚨 CLI Test Error ({token_desc}): {e}")
356
- finally:
357
- if main_fusion_instance and main_fusion_instance.model_manager and hasattr(main_fusion_instance.model_manager, 'shutdown'):
358
- logger.info(f"CLI Test ({token_desc}): Shutting down ModelManager...")
359
- main_fusion_instance.model_manager.shutdown() # This is synchronous
360
- logger.info(f"🛑 MainFusion CLI test ({token_desc}) shutdown.")
361
- async def run_all_cli_tests():
362
- # Ensure stripe_users.json is in project root for this test to work with tokens
363
- tokens_to_test = {
364
- None: "No Token (Defaults to Free)",
365
- "TOKEN_FOR_FREE_TEST": "Free Tier Token", # Add this to your stripe_users.json
366
- "TOKEN_FOR_STARTER_TEST": "Starter Tier Token", # Add this
367
- "TOKEN_FOR_PRO_TEST": "Pro Tier Token" # Add this
368
- }
369
- # Create dummy stripe_users.json if not exists for test
370
- if not os.path.exists(os.path.join(project_root, "stripe_users.json")):
371
- logger.warning("Creating dummy stripe_users.json for CLI test.")
372
- dummy_users = {t: t.split('_')[1].lower() for t in tokens_to_test if t} # type: ignore
373
- with open(os.path.join(project_root, "stripe_users.json"), "w") as f:
374
- json.dump(dummy_users, f, indent=2)
375
-
376
- for token, desc in tokens_to_test.items():
377
- await run_main_fusion_cli_test_with_token(token, desc)
378
-
379
- asyncio.run(run_all_cli_tests())
 
1
+ # FILE: modules/main_fusion_public.py (True Fusion, Verified Launch Version)
2
+
3
+ import asyncio
4
+ import logging
5
+ from typing import Dict, Any, Optional, List
6
+
7
+ # These imports will now work correctly with the verified config file
8
+ try:
9
+ from modules.config_settings_public import (
10
+ MODEL_PATHS,
11
+ MODEL_SPECIFIC_PARAMS,
12
+ INFERENCE_PRESETS,
13
+ DEFAULT_INFERENCE_PRESET,
14
+ MODEL_ROLES,
15
+ MODEL_ROLE_SYSTEM_PROMPTS,
16
+ DEFAULT_SYSTEM_PROMPT
17
+ )
18
+ from llama_cpp import Llama
19
+ LLAMA_CPP_AVAILABLE = True
20
+ except ImportError as e:
21
+ # This is a critical failure, the app cannot run without these.
22
+ logging.basicConfig(level=logging.CRITICAL)
23
+ logging.critical(f"CRITICAL IMPORT ERROR in main_fusion_public.py: {e}. ZOTHEOS cannot function.")
24
+ LLAMA_CPP_AVAILABLE = False
25
+
26
+
27
+ logger = logging.getLogger("ZOTHEOS_MainFusion")
28
+ if not logger.handlers:
29
+ # Configure logger if it hasn't been configured yet
30
+ handler = logging.StreamHandler()
31
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
32
+ handler.setFormatter(formatter)
33
+ logger.addHandler(handler)
34
+ logger.setLevel(logging.INFO)
35
+
36
+
37
+ class MainFusionPublic:
38
+ """
39
+ The core engine for ZOTHEOS. It loads multiple AI models, queries them in parallel
40
+ with distinct persona-based prompts, and then synthesizes their responses into a
41
+ higher-order summary.
42
+ """
43
+ def __init__(self):
44
+ logger.info("🚀 ZOTHEOS MainFusion Initializing (True Fusion Mode)...")
45
+ if not LLAMA_CPP_AVAILABLE:
46
+ raise RuntimeError("Llama.cpp backend is not available. ZOTHEOS cannot start.")
47
+
48
+ self.models: Dict[str, Optional[Llama]] = {}
49
+ self.config = {
50
+ "MODEL_ROLES": MODEL_ROLES,
51
+ "MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS,
52
+ }
53
+ self.models_last_queried_for_perspectives: List[str] = []
54
+
55
+ self._initialize_models()
56
+
57
+ def _initialize_models(self):
58
+ """Loads all models defined in the config into memory."""
59
+ for model_name, model_path in MODEL_PATHS.items():
60
+ try:
61
+ logger.info(f"Loading model: {model_name}...")
62
+ params = MODEL_SPECIFIC_PARAMS['_default'].copy()
63
+ params.update(MODEL_SPECIFIC_PARAMS.get(model_name, {}))
64
+
65
+ self.models[model_name] = Llama(model_path=model_path, **params)
66
+ logger.info(f"✅ Model '{model_name}' loaded successfully.")
67
+ except Exception as e:
68
+ logger.error(f"❌ Failed to load model '{model_name}': {e}", exc_info=True)
69
+ self.models[model_name] = None
70
+
71
+ async def _get_single_perspective(self, model_name: str, query: str) -> str:
72
+ """Queries a single model with its assigned role and returns the response."""
73
+ if model_name not in self.models or self.models[model_name] is None:
74
+ logger.warning(f"Model '{model_name}' is not loaded or failed to initialize.")
75
+ return f"[Error: The '{model_name}' AI core is offline.]"
76
+
77
+ role = self.config["MODEL_ROLES"].get(model_name, "general")
78
+ system_prompt = self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(role, DEFAULT_SYSTEM_PROMPT)
79
+
80
+ llm = self.models[model_name]
81
+ messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}]
82
+ preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {})
83
+
84
+ try:
85
+ logger.info(f"Querying {model_name} ({role})...")
86
+ # Run the blocking call in a separate thread to not block the event loop
87
+ response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset)
88
+ content = response['choices'][0]['message']['content'].strip()
89
+ logger.info(f"✅ Response received from {model_name}.")
90
+ return content
91
+ except Exception as e:
92
+ logger.error(f"Error during inference for model '{model_name}': {e}", exc_info=True)
93
+ return f"[Error during inference for '{model_name}'. See logs for details.]"
94
+
95
+ async def _generate_synthesis(self, query: str, perspectives: Dict[str, str]) -> str:
96
+ """Uses a designated model to synthesize a summary from all perspectives."""
97
+ summarizer_model = "gemma" # Gemma is good for this task
98
+ if summarizer_model not in self.models or self.models[summarizer_model] is None:
99
+ logger.warning(f"Summarizer model '{summarizer_model}' not available. Returning basic summary.")
100
+ return "Multiple perspectives were gathered, but the synthesis AI core is offline. Please review the detailed perspectives below."
101
+
102
+ valid_perspectives = {name: text for name, text in perspectives.items() if not text.startswith("[Error")}
103
+ if not valid_perspectives:
104
+ return "No valid perspectives could be generated to create a summary."
105
+ if len(valid_perspectives) == 1:
106
+ return next(iter(valid_perspectives.values()))
107
+
108
+ synthesis_prompt = f"Original User Question: \"{query}\"\n\nSynthesize the following distinct perspectives into a single, cohesive, and insightful summary. Capture the core agreements, disagreements, and unique insights from each viewpoint. Your goal is to provide a higher-order understanding that integrates these different analyses.\n\n"
109
+ for name, text in valid_perspectives.items():
110
+ role = self.config["MODEL_ROLES"].get(name, "General")
111
+ synthesis_prompt += f"--- PERSPECTIVE FROM {name.upper()} ({role.capitalize()}) ---\n{text}\n\n"
112
+ synthesis_prompt += "--- SYNTHESIZED INSIGHT ---\n"
113
+
114
+ summary_system_prompt = "You are a master synthesis AI. Your role is to create a clear and insightful summary from the provided texts, acting as a final arbiter of truth."
115
+ return await self._get_single_perspective(summarizer_model, synthesis_prompt)
116
+
117
+ def _format_final_output(self, summary: str, perspectives: Dict[str, str]) -> str:
118
+ """Formats the final Markdown output for the Gradio interface."""
119
+ output = f"## ✨ ZOTHEOS Final Synthesized Insight ✨\n\n{summary}\n\n---\n\n### 💬 Detailed Individual Perspectives\n\n"
120
+ for model_name, text in perspectives.items():
121
+ role = self.config["MODEL_ROLES"].get(model_name, "General")
122
+ output += f"**Perspective from {model_name.capitalize()} ({role.capitalize()}):**\n{text}\n\n"
123
+ return output.strip()
124
+
125
+ async def process_query_with_fusion(self, query: str, **kwargs):
126
+ """The main entry point for processing a user query with True Fusion."""
127
+ if not all(self.models.values()):
128
+ return "[Critical Error: Not all AI cores are online. Fusion is not possible.]"
129
+
130
+ # For now, we use all configured models. Tier logic can be re-added here.
131
+ models_to_query = list(self.models.keys())
132
+ self.models_last_queried_for_perspectives = models_to_query
133
+
134
+ logger.info(f"Initiating True Fusion for query: '{query[:60]}...' using models: {models_to_query}")
135
+
136
+ # Create and run all perspective-gathering tasks in parallel
137
+ perspective_tasks = [self._get_single_perspective(model_name, query) for model_name in models_to_query]
138
+ gathered_responses = await asyncio.gather(*perspective_tasks)
139
+ perspectives = dict(zip(models_to_query, gathered_responses))
140
+
141
+ # Generate the final synthesis based on the gathered perspectives
142
+ logger.info("All perspectives gathered. Generating final synthesis...")
143
+ synthesis = await self._generate_synthesis(query, perspectives)
144
+
145
+ # Format the final output for the user
146
+ logger.info("Synthesis complete. Formatting final output.")
147
+ return self._format_final_output(synthesis, perspectives)
148
+
149
+ async def get_status_report(self) -> Dict[str, Any]:
150
+ """Provides a status report for the system."""
151
+ return {
152
+ "status": "Online - True Fusion Mode",
153
+ "loaded_models": [name for name, model in self.models.items() if model is not None],
154
+ "failed_models": [name for name, model in self.models.items() if model is None],
155
+ "last_queried_for_fusion": self.models_last_queried_for_perspectives,
156
+ }