ZOTHEOS-App / modules /main_fusion_public.py
ZOTHEOS's picture
Update modules/main_fusion_public.py
95229a8 verified
# FILE: modules/main_fusion_public.py (Hugging Face Demo)
import asyncio
import logging
import json
from typing import Dict, Any, Optional, List
try:
from modules.config_settings_public import (
MODEL_PATHS, MODEL_SPECIFIC_PARAMS, INFERENCE_PRESETS,
DEFAULT_INFERENCE_PRESET, MODEL_ROLES, MODEL_ROLE_SYSTEM_PROMPTS, DEFAULT_SYSTEM_PROMPT
)
from llama_cpp import Llama
LLAMA_CPP_AVAILABLE = True
except ImportError as e:
logging.basicConfig(level=logging.CRITICAL)
logging.critical(f"CRITICAL HF IMPORT ERROR: {e}.")
LLAMA_CPP_AVAILABLE = False
logger = logging.getLogger("ZOTHEOS_MainFusion_HF")
# --- βœ… Simple In-Memory MemoryBank for the Web Demo ---
class MemoryBank:
"""A simple, non-persistent memory bank for the web demo."""
def __init__(self):
self.memories: List[Dict[str, Any]] = []
async def store_memory_async(self, query: str, response: str, metadata: Optional[Dict[str, Any]] = None):
entry = {'query': query, 'response': response, 'metadata': metadata or {}}
self.memories.append(entry)
if len(self.memories) > 10: self.memories.pop(0) # Keep memory from growing too large
async def retrieve_recent_memories_async(self, limit: int = 5) -> List[Dict[str, Any]]:
return self.memories[-limit:]
async def get_all_memories_for_export_async(self) -> List[Dict[str, Any]]:
return list(self.memories) # Return a copy
# --- βœ… The AGI-Tier Engine, Optimized for Web ---
class MainFusionPublic:
def __init__(self):
logger.info("πŸš€ ZOTHEOS HF Demo Engine Initializing...")
if not LLAMA_CPP_AVAILABLE:
raise RuntimeError("Llama.cpp backend is not available.")
self.models: Dict[str, Optional[Llama]] = {}
self.config = {"MODEL_ROLES": MODEL_ROLES, "MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS}
self.models_last_queried: List[str] = []
self.memory_bank = MemoryBank()
self._initialize_models()
def _initialize_models(self):
for name, path in MODEL_PATHS.items():
try:
logger.info(f"HF Demo: Loading model {name}...")
params = MODEL_SPECIFIC_PARAMS.get('_default', {}).copy()
params.update(MODEL_SPECIFIC_PARAMS.get(name, {}))
self.models[name] = Llama(model_path=path, **params)
logger.info(f"βœ… HF Demo: Model '{name}' loaded.")
except Exception as e:
logger.error(f"❌ HF Demo: Failed to load model '{name}': {e}", exc_info=True)
self.models[name] = None
async def _get_perspective(self, model_name: str, query: str, system_prompt: str) -> str:
llm = self.models.get(model_name)
if not llm: return f"[Error: '{model_name}' core is offline.]"
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}]
preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {})
try:
response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logger.error(f"HF Demo Inference error for '{model_name}': {e}")
return f"[Error during inference for '{model_name}'.]"
async def _generate_true_synthesis(self, query: str, perspectives: Dict[str, str]) -> str:
summarizer_model = "gemma"
if summarizer_model not in self.models or not self.models[summarizer_model]:
return "Synthesis AI core is offline."
valid_perspectives = {k: v for k, v in perspectives.items() if not v.startswith("[Error")}
if not valid_perspectives: return "No valid perspectives were generated."
synthesis_prompt = f"""
As the master intelligence of ZOTHEOS, perform a high-level synthesis of these perspectives on the user's query: "{query}".
Your goal is to find the deeper truth. Analyze the viewpoints to:
1. Identify the core theme.
2. Highlight the most significant tension between them.
3. Extract a unique "aha!" insight that emerges from their combination.
4. Conclude with a definitive, balanced statement.
Perspectives:
{json.dumps(valid_perspectives, indent=2)}
Your Final Synthesized Insight:
"""
system_prompt = "You are a master synthesis AI. Create a clear, insightful summary from the provided texts."
return await self._get_perspective(summarizer_model, synthesis_prompt, system_prompt)
def _format_output(self, summary: str, perspectives: Dict[str, str]) -> str:
output = f"## ✨ ZOTHEOS Final Synthesized Insight ✨\n\n{summary}\n\n### πŸ’¬ Detailed Individual Perspectives\n\n"
for name, text in perspectives.items():
role = self.config["MODEL_ROLES"].get(name, "General")
output += f"**Perspective from {name.capitalize()} ({role.capitalize()}):**\n{text}\n\n"
return output.strip()
async def process_query_with_fusion(self, query: str, **kwargs) -> str:
online_models = [name for name, model in self.models.items() if model]
if not online_models: return "[Critical Error: All AI cores are offline.]"
self.models_last_queried = online_models
tasks = {name: self._get_perspective(
name, query,
self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(self.config["MODEL_ROLES"].get(name, "general"), DEFAULT_SYSTEM_PROMPT)
) for name in online_models}
responses = await asyncio.gather(*tasks.values())
perspectives = dict(zip(tasks.keys(), responses))
synthesis = await self._generate_true_synthesis(query, perspectives)
final_output = self._format_output(synthesis, perspectives)
if self.memory_bank:
await self.memory_bank.store_memory_async(query=query, response=final_output)
return final_output
async def get_status_report(self) -> Dict[str, Any]:
return {"status": "Online - Web Demo Mode", "loaded_models": [name for name, model in self.models.items() if model is not None]}