Spaces:
Running
Running
# FILE: modules/main_fusion_public.py (Hugging Face Demo) | |
import asyncio | |
import logging | |
import json | |
from typing import Dict, Any, Optional, List | |
try: | |
from modules.config_settings_public import ( | |
MODEL_PATHS, MODEL_SPECIFIC_PARAMS, INFERENCE_PRESETS, | |
DEFAULT_INFERENCE_PRESET, MODEL_ROLES, MODEL_ROLE_SYSTEM_PROMPTS, DEFAULT_SYSTEM_PROMPT | |
) | |
from llama_cpp import Llama | |
LLAMA_CPP_AVAILABLE = True | |
except ImportError as e: | |
logging.basicConfig(level=logging.CRITICAL) | |
logging.critical(f"CRITICAL HF IMPORT ERROR: {e}.") | |
LLAMA_CPP_AVAILABLE = False | |
logger = logging.getLogger("ZOTHEOS_MainFusion_HF") | |
# --- β Simple In-Memory MemoryBank for the Web Demo --- | |
class MemoryBank: | |
"""A simple, non-persistent memory bank for the web demo.""" | |
def __init__(self): | |
self.memories: List[Dict[str, Any]] = [] | |
async def store_memory_async(self, query: str, response: str, metadata: Optional[Dict[str, Any]] = None): | |
entry = {'query': query, 'response': response, 'metadata': metadata or {}} | |
self.memories.append(entry) | |
if len(self.memories) > 10: self.memories.pop(0) # Keep memory from growing too large | |
async def retrieve_recent_memories_async(self, limit: int = 5) -> List[Dict[str, Any]]: | |
return self.memories[-limit:] | |
async def get_all_memories_for_export_async(self) -> List[Dict[str, Any]]: | |
return list(self.memories) # Return a copy | |
# --- β The AGI-Tier Engine, Optimized for Web --- | |
class MainFusionPublic: | |
def __init__(self): | |
logger.info("π ZOTHEOS HF Demo Engine Initializing...") | |
if not LLAMA_CPP_AVAILABLE: | |
raise RuntimeError("Llama.cpp backend is not available.") | |
self.models: Dict[str, Optional[Llama]] = {} | |
self.config = {"MODEL_ROLES": MODEL_ROLES, "MODEL_ROLE_SYSTEM_PROMPTS": MODEL_ROLE_SYSTEM_PROMPTS} | |
self.models_last_queried: List[str] = [] | |
self.memory_bank = MemoryBank() | |
self._initialize_models() | |
def _initialize_models(self): | |
for name, path in MODEL_PATHS.items(): | |
try: | |
logger.info(f"HF Demo: Loading model {name}...") | |
params = MODEL_SPECIFIC_PARAMS.get('_default', {}).copy() | |
params.update(MODEL_SPECIFIC_PARAMS.get(name, {})) | |
self.models[name] = Llama(model_path=path, **params) | |
logger.info(f"β HF Demo: Model '{name}' loaded.") | |
except Exception as e: | |
logger.error(f"β HF Demo: Failed to load model '{name}': {e}", exc_info=True) | |
self.models[name] = None | |
async def _get_perspective(self, model_name: str, query: str, system_prompt: str) -> str: | |
llm = self.models.get(model_name) | |
if not llm: return f"[Error: '{model_name}' core is offline.]" | |
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}] | |
preset = INFERENCE_PRESETS.get(DEFAULT_INFERENCE_PRESET, {}) | |
try: | |
response = await asyncio.to_thread(llm.create_chat_completion, messages=messages, **preset) | |
return response['choices'][0]['message']['content'].strip() | |
except Exception as e: | |
logger.error(f"HF Demo Inference error for '{model_name}': {e}") | |
return f"[Error during inference for '{model_name}'.]" | |
async def _generate_true_synthesis(self, query: str, perspectives: Dict[str, str]) -> str: | |
summarizer_model = "gemma" | |
if summarizer_model not in self.models or not self.models[summarizer_model]: | |
return "Synthesis AI core is offline." | |
valid_perspectives = {k: v for k, v in perspectives.items() if not v.startswith("[Error")} | |
if not valid_perspectives: return "No valid perspectives were generated." | |
synthesis_prompt = f""" | |
As the master intelligence of ZOTHEOS, perform a high-level synthesis of these perspectives on the user's query: "{query}". | |
Your goal is to find the deeper truth. Analyze the viewpoints to: | |
1. Identify the core theme. | |
2. Highlight the most significant tension between them. | |
3. Extract a unique "aha!" insight that emerges from their combination. | |
4. Conclude with a definitive, balanced statement. | |
Perspectives: | |
{json.dumps(valid_perspectives, indent=2)} | |
Your Final Synthesized Insight: | |
""" | |
system_prompt = "You are a master synthesis AI. Create a clear, insightful summary from the provided texts." | |
return await self._get_perspective(summarizer_model, synthesis_prompt, system_prompt) | |
def _format_output(self, summary: str, perspectives: Dict[str, str]) -> str: | |
output = f"## β¨ ZOTHEOS Final Synthesized Insight β¨\n\n{summary}\n\n### π¬ Detailed Individual Perspectives\n\n" | |
for name, text in perspectives.items(): | |
role = self.config["MODEL_ROLES"].get(name, "General") | |
output += f"**Perspective from {name.capitalize()} ({role.capitalize()}):**\n{text}\n\n" | |
return output.strip() | |
async def process_query_with_fusion(self, query: str, **kwargs) -> str: | |
online_models = [name for name, model in self.models.items() if model] | |
if not online_models: return "[Critical Error: All AI cores are offline.]" | |
self.models_last_queried = online_models | |
tasks = {name: self._get_perspective( | |
name, query, | |
self.config["MODEL_ROLE_SYSTEM_PROMPTS"].get(self.config["MODEL_ROLES"].get(name, "general"), DEFAULT_SYSTEM_PROMPT) | |
) for name in online_models} | |
responses = await asyncio.gather(*tasks.values()) | |
perspectives = dict(zip(tasks.keys(), responses)) | |
synthesis = await self._generate_true_synthesis(query, perspectives) | |
final_output = self._format_output(synthesis, perspectives) | |
if self.memory_bank: | |
await self.memory_bank.store_memory_async(query=query, response=final_output) | |
return final_output | |
async def get_status_report(self) -> Dict[str, Any]: | |
return {"status": "Online - Web Demo Mode", "loaded_models": [name for name, model in self.models.items() if model is not None]} |