Spaces:
Sleeping
Sleeping
import gradio as gr | |
import asyncio | |
import logging | |
import os | |
import sys | |
import time | |
import html | |
import json | |
import tempfile | |
from typing import Optional, Dict, List, Tuple | |
# --- Constants & Logger (Your original, stable code) --- | |
APP_TITLE = "ZOTHEOS - Ethical Fusion AI" | |
FAVICON_FILENAME = "favicon_blackBW.ico" | |
LOGO_FILENAME = "zotheos_logo.png" | |
# Using your robust logger setup | |
def init_logger(): | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S") | |
logger_instance = logging.getLogger("ZOTHEOS_Interface") | |
if not logger_instance.handlers: | |
handler = logging.StreamHandler(sys.stdout); handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")); logger_instance.addHandler(handler) | |
return logger_instance | |
logger = init_logger() | |
# --- Asset Path Helper (Your original, stable code) --- | |
def get_asset_path(filename: str) -> Optional[str]: | |
base_dir = getattr(sys, '_MEIPASS', os.path.dirname(os.path.abspath(__file__))) | |
for path in [os.path.join(base_dir, 'assets', filename), os.path.join(base_dir, filename)]: | |
if os.path.exists(path): | |
logger.info(f"Asset found: '{filename}' -> '{path}'") | |
return path | |
logger.warning(f"Asset NOT FOUND: '{filename}'.") | |
return None | |
logo_path_verified = get_asset_path(LOGO_FILENAME) | |
favicon_path_verified = get_asset_path(FAVICON_FILENAME) | |
# --- Core Logic & AI System Initialization (Your original, stable code) --- | |
MainFusionPublic = None; initialization_error: Optional[Exception] = None; DEFAULT_INFERENCE_PRESET_INTERFACE = "balanced"; get_user_tier = lambda token: "error (auth module failed)" | |
try: | |
script_dir = os.path.dirname(os.path.abspath(__file__)); project_root = script_dir | |
if project_root not in sys.path: sys.path.insert(0, project_root); | |
from modules.main_fusion_public import MainFusionPublic | |
try: from modules.config_settings_public import DEFAULT_INFERENCE_PRESET as CONFIG_DEFAULT_PRESET; DEFAULT_INFERENCE_PRESET_INTERFACE = CONFIG_DEFAULT_PRESET | |
except ImportError: logger.warning("Could not import DEFAULT_INFERENCE_PRESET.") | |
from modules.user_auth import get_user_tier as imported_get_user_tier; get_user_tier = imported_get_user_tier; logger.info("β Successfully imported MainFusionPublic and user_auth.") | |
except Exception as e: | |
logger.error(f"β Error importing modules: {e}", exc_info=True); initialization_error = e | |
ai_system: Optional[MainFusionPublic] = None | |
if 'MainFusionPublic' in globals() and MainFusionPublic is not None and initialization_error is None: | |
try: logger.info("Initializing ZOTHEOS AI System..."); ai_system = MainFusionPublic(); logger.info("β ZOTHEOS AI System Initialized.") | |
except Exception as e_init: initialization_error = e_init; logger.error(f"β AI System Init Failed: {e_init}", exc_info=True); ai_system = None | |
elif initialization_error is None: initialization_error = ModuleNotFoundError("MainFusionPublic could not be imported."); logger.error(f"β {initialization_error}") | |
# --- TIER_FEATURES Dictionary (Your original, stable code) --- | |
TIER_FEATURES = {"free": {"display_name": "Free Tier", "memory_enabled": False, "export_enabled": False}, "starter": {"display_name": "Starter Tier", "memory_enabled": True, "export_enabled": False}, "pro": {"display_name": "Pro Tier", "memory_enabled": True, "export_enabled": True}, "error (auth module failed)": {"display_name": "Error", "memory_enabled": False, "export_enabled": False}} | |
# | |
# --- NEW DESIGN: "ANCESTRAL TECH" (BLACK, WHITE, AZTEC TURQUOISE) --- | |
# | |
zotheos_base_css = """ | |
@import url('https://fonts.googleapis.com/css2?family=Teko:wght@400;600&family=Inter:wght@400;500&display=swap'); | |
:root { | |
--bg-main: #0A0A0A; | |
--bg-surface: #141414; | |
--text-primary: #E0E0E0; | |
--text-secondary: #707070; | |
--border-color: #282828; | |
--accent-turquoise: #40E0D0; | |
--accent-turquoise-dark: #32B3A6; | |
--font-display: 'Teko', 'Staatliches', sans-serif; | |
--font-body: 'Inter', sans-serif; | |
--radius: 4px; /* Sharp, geometric corners */ | |
--glow-shadow: 0 0 12px 0px rgba(64, 224, 208, 0.4); | |
} | |
/* --- Base Resets --- */ | |
footer, canvas { display: none !important; } | |
html, body { | |
background-color: var(--bg-main) !important; | |
font-family: var(--font-body); | |
color: var(--text-primary); | |
} | |
.gradio-container { | |
max-width: 840px !important; | |
margin: 0 auto !important; | |
padding: 2rem 1.5rem !important; | |
background-color: transparent !important; | |
} | |
/* --- Header & Typography --- */ | |
#header_column { text-align: center; margin-bottom: 2rem; } | |
#header_logo img { | |
max-height: 80px; | |
filter: brightness(0) invert(1); /* Ensure logo is pure white */ | |
text-shadow: 0 0 10px rgba(255,255,255,0.3); | |
} | |
#header_subtitle { | |
font-family: var(--font-display) !important; | |
font-size: 1.4rem !important; | |
letter-spacing: 2px; | |
color: var(--text-secondary); | |
text-transform: uppercase; | |
} | |
#welcome_message { | |
font-family: var(--font-display) !important; | |
font-size: 1.8rem !important; | |
letter-spacing: 1px; | |
text-transform: uppercase; | |
color: var(--text-primary); | |
border-top: 1px solid var(--border-color); | |
border-bottom: 1px solid var(--border-color); | |
padding: 0.5rem 0; | |
margin-bottom: 2rem; | |
} | |
/* --- Input & Interactive Elements --- */ | |
.gradio-textbox textarea, .gradio-textbox input { | |
background-color: var(--bg-surface) !important; | |
border: 1px solid var(--border-color) !important; | |
border-radius: var(--radius) !important; | |
color: var(--text-primary) !important; | |
transition: all 0.2s ease-in-out; | |
} | |
.gradio-textbox textarea:focus, .gradio-textbox input:focus { | |
border-color: var(--accent-turquoise) !important; | |
box-shadow: var(--glow-shadow); | |
background-color: #181818 !important; | |
} | |
#query_input textarea { min-height: 150px !important; font-size: 1rem; } | |
/* --- Buttons --- */ | |
.gradio-button { border-radius: var(--radius) !important; transition: all 0.2s ease; border: 1px solid transparent; } | |
#submit_button { | |
background: var(--accent-turquoise) !important; | |
color: #000 !important; | |
font-weight: 600 !important; | |
text-transform: uppercase; | |
letter-spacing: 1px; | |
border-color: var(--accent-turquoise) !important; | |
} | |
#submit_button:hover { background: var(--text-primary) !important; border-color: var(--text-primary) !important; transform: scale(1.02); } | |
#clear_button { background: var(--bg-surface) !important; color: var(--text-secondary) !important; border-color: var(--border-color) !important; } | |
#clear_button:hover { color: var(--text-primary) !important; border-color: var(--text-secondary) !important; } | |
#export_memory_button { background: transparent !important; color: var(--text-secondary) !important; border: 1px dashed var(--border-color); } | |
#export_memory_button.interactive_button_enabled { color: var(--accent-turquoise) !important; border-color: var(--accent-turquoise) !important; } | |
#export_memory_button.interactive_button_enabled:hover { background: rgba(64, 224, 208, 0.1) !important; } | |
/* --- Output & Accordions --- */ | |
#synthesized_summary_output, #fusion_output { | |
background-color: transparent !important; | |
border: 1px solid var(--border-color) !important; | |
border-radius: var(--radius) !important; | |
padding: 1rem 1.5rem !important; | |
} | |
.gradio-accordion > .label-wrap { | |
background-color: var(--bg-surface) !important; | |
border: 1px solid var(--border-color) !important; | |
border-radius: var(--radius) !important; | |
} | |
/* --- Tier Status & Footer --- */ | |
#tier_status_display small { color: var(--text-secondary) !important; font-size: 0.8rem; } | |
#footer_attribution { font-size: 0.75rem; color: #444; text-align: center; margin-top: 3rem; } | |
""" | |
# --- All Helper Functions (Your original, stable code) --- | |
def render_memory_entries_as_html(memory_entries: List[dict]) -> str: | |
# This function is fine as-is. The new CSS will style the output. | |
if not memory_entries: return "<div class='memory-entry'>No stored memory entries or memory disabled.</div>" | |
html_blocks = []; | |
for entry in reversed(memory_entries[-5:]): | |
query = html.escape(entry.get("query", "N/A")); ts_iso = entry.get("metadata", {}).get("timestamp_iso", "Unknown"); | |
try: dt_obj = time.strptime(ts_iso, '%Y-%m-%dT%H:%M:%SZ'); formatted_ts = time.strftime('%Y-%m-%d %H:%M UTC', dt_obj) | |
except: formatted_ts = html.escape(ts_iso) | |
models_list = entry.get("metadata", {}).get("active_models_queried", []); models_str = html.escape(", ".join(models_list)) if models_list else "N/A" | |
summary_raw = entry.get("metadata", {}).get("synthesized_summary_text", "No summary."); summary = html.escape(summary_raw[:300]) + ("..." if len(summary_raw) > 300 else "") | |
html_blocks.append(f"<div class='memory-entry'><p><strong>π TS:</strong> {formatted_ts}</p><p><strong>π§ Models:</strong> {models_str}</p><div><strong>β Q:</strong><div class='query-text'>{query}</div></div><div><strong>π Sum:</strong><div class='summary-text'>{summary}</div></div></div>") | |
return "\n".join(html_blocks) | |
def format_tier_status_for_ui(tier_name: str) -> str: | |
features = TIER_FEATURES.get(tier_name, TIER_FEATURES["error (auth module failed)"]) | |
return f"<small>Tier: **{features['display_name']}** | Memory: {'β ' if features['memory_enabled'] else 'β'} | Export: {'β ' if features['export_enabled'] else 'β'}</small>" | |
def update_tier_display_and_features_ui(user_token_str: Optional[str]) -> Tuple: | |
tier_name = get_user_tier(user_token_str or "") | |
tier_features = TIER_FEATURES.get(tier_name, TIER_FEATURES["free"]) | |
return (gr.update(value=format_tier_status_for_ui(tier_name)), gr.update(interactive=tier_features["export_enabled"], elem_classes="interactive_button_enabled" if tier_features["export_enabled"] else "")) | |
def clear_all_fields(): | |
default_tier_name = get_user_tier(""); default_features = TIER_FEATURES.get(default_tier_name, TIER_FEATURES["free"]) | |
return ("", "", gr.update(value=format_tier_status_for_ui(default_tier_name)), "Detailed perspectives...", "Synthesized insight...", "**Status:** Awaiting Input", "No memory entries loaded.", gr.update(interactive=default_features["export_enabled"], elem_classes="interactive_button_enabled" if default_features["export_enabled"] else ""), None) | |
async def export_user_memory_data_for_download(user_token_str: Optional[str]) -> Optional[str]: | |
# This function is fine as-is. | |
try: | |
if not (tier_features := TIER_FEATURES.get(get_user_tier(user_token_str or ""), TIER_FEATURES["free"]))['export_enabled']: gr.Warning("π« Export is a Pro Tier feature."); return None | |
if not ai_system or not hasattr(ai_system, 'memory_bank'): gr.Error("β οΈ Memory system unavailable."); return None | |
memory_data = await ai_system.memory_bank.get_all_memories_for_export_async() if hasattr(ai_system.memory_bank, 'get_all_memories_for_export_async') else ai_system.memory_bank.get_all_memories_for_export() | |
if not memory_data: gr.Info("βΉοΈ No memory to export."); return None | |
with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".json", encoding="utf-8") as f: f.write(json.dumps(memory_data, indent=2)); return f.name | |
except Exception as e: logger.error(f"β Export error: {e}", exc_info=True); gr.Error(f"β οΈ Export error: {e}"); return None | |
# --- Build Gradio Interface (Using your stable layout with the new CSS) --- | |
def build_interface(logo_path_param: Optional[str], favicon_path_param: Optional[str]) -> gr.Blocks: | |
theme = gr.themes.Base(font=[gr.themes.GoogleFont("Inter"), "sans-serif"]) | |
initial_tier_name = get_user_tier("") | |
initial_tier_features = TIER_FEATURES.get(initial_tier_name, TIER_FEATURES["free"]) | |
# The favicon_path is correctly passed here, which will make the logo appear in the browser tab. | |
with gr.Blocks(theme=theme, css=zotheos_base_css, title=APP_TITLE, favicon_path=favicon_path_param) as demo: | |
with gr.Column(elem_classes="centered-container"): | |
# Using your proven layout structure | |
with gr.Column(elem_id="header_column"): | |
if logo_path_param: gr.Image(value=logo_path_param, elem_id="header_logo", show_label=False, container=False, interactive=False) | |
gr.Markdown("Ethical Fusion AI for Synthesized Intelligence", elem_id="header_subtitle") | |
gr.Markdown("π‘ Fusing perspectives for deeper truth.", elem_id="welcome_message") | |
# Auth Row | |
with gr.Row(elem_id="auth_row", equal_height=False): | |
user_token_input = gr.Textbox(label="π Access Token", placeholder="Enter token...", type="password", elem_id="user_token_input", scale=3, container=False) | |
tier_status_display = gr.Markdown(value=format_tier_status_for_ui(initial_tier_name), elem_id="tier_status_display") | |
# Input Group | |
with gr.Group(elem_classes="interface-section"): | |
query_input_component = gr.Textbox(label="Your Inquiry:", placeholder="e.g., Analyze the ethical implications of AI in art...", lines=6, elem_id="query_input") | |
status_indicator_component = gr.HTML(elem_id="status_indicator", visible=False) | |
with gr.Row(): | |
submit_button_component = gr.Button("Process Inquiry", elem_id="submit_button", variant="primary", scale=2) | |
clear_button_component = gr.Button("Clear All", elem_id="clear_button", variant="secondary", scale=1) | |
# Output Group | |
with gr.Group(elem_classes="interface-section"): | |
synthesized_summary_component = gr.Markdown(elem_id="synthesized_summary_output", value="Synthesized insight will appear here...") | |
fused_response_output_component = gr.Markdown(elem_id="fusion_output", value="Detailed perspectives will appear here...") | |
# Tools & Footer (Your proven layout) | |
export_memory_button = gr.Button("β¬οΈ Export Memory Log", interactive=initial_tier_features["export_enabled"], elem_classes="interactive_button_enabled" if initial_tier_features["export_enabled"] else "") | |
with gr.Accordion("π‘ System Status & Active Models", open=False): active_models_display_component = gr.Markdown("**Status:** Initializing...") | |
with gr.Accordion("π§ Recent Interaction Chronicle (Last 5)", open=False): memory_display_panel_html = gr.HTML("No memory entries yet.") | |
gr.Markdown("--- \n ZOTHEOS Β© 2025 ZOTHEOS LLC. System Architect: David A. Garcia.", elem_id="footer_attribution") | |
# --- Event Handlers (Using your stable, robust logic) --- | |
# This is your original, stable processing wrapper. | |
async def process_query_wrapper_internal(query, token): | |
loading_html = f"<div class='cosmic-loading'><div class='orbital-spinner'></div><div class='thinking-text'>ZOTHEOS is synthesizing...</div></div>" | |
yield (gr.update(value=loading_html, visible=True), gr.update(interactive=False), gr.update(interactive=False), "", "", "", "Loading...") | |
if not ai_system or initialization_error: | |
error_html = f"π« Core System Error: {html.escape(str(initialization_error))}" | |
yield (gr.update(value=error_html, visible=True), gr.update(interactive=True), gr.update(interactive=True), "System error.", "System error.", "Offline", "Memory unavailable.") | |
return | |
response = await ai_system.process_query_with_fusion(query, user_token=token, fusion_mode_override=DEFAULT_INFERENCE_PRESET_INTERFACE) | |
# Your original parsing logic here | |
summary = "## β¨ ZOTHEOS Final Synthesized Insight β¨\n" + (res.split("## β¨ ZOTHEOS Final Synthesized Insight β¨")[-1] if "## β¨ ZOTHEOS Final Synthesized Insight β¨" in (res:=response) else res) | |
perspectives = "## π§ ZOTHEOS Fused Perspectives π§ \n" + (res.split("## β¨ ZOTHEOS Final Synthesized Insight β¨")[0] if "## β¨ ZOTHEOS Final Synthesized Insight β¨" in res else "Details in summary.") | |
tier_name = get_user_tier(token or "") | |
mem_html = render_memory_entries_as_html(await ai_system.memory_bank.retrieve_recent_memories_async(limit=5)) if TIER_FEATURES[tier_name]['memory_enabled'] and hasattr(ai_system, 'memory_bank') else "Memory disabled." | |
status_report = await ai_system.get_status_report() | |
status_md = f"**Active Cores:** `{', '.join(status_report.get('models_last_queried_for_perspectives', ['N/A']))}`" | |
yield (gr.update(visible=False), gr.update(interactive=True), gr.update(interactive=True), summary, perspectives, status_md, mem_html) | |
submit_button_component.click(fn=process_query_wrapper_internal, inputs=[query_input_component, user_token_input], outputs=[status_indicator_component, submit_button_component, clear_button_component, synthesized_summary_component, fused_response_output_component, active_models_display_component, memory_display_panel_html], show_progress="hidden") | |
clear_button_component.click(fn=clear_all_fields, outputs=[query_input_component, user_token_input, tier_status_display, fused_response_output_component, synthesized_summary_component, active_models_display_component, memory_display_panel_html, export_memory_button, None], show_progress="hidden") | |
user_token_input.change(fn=update_tier_display_and_features_ui, inputs=[user_token_input], outputs=[tier_status_display, export_memory_button], show_progress="hidden") | |
export_memory_button.click(fn=export_user_memory_data_for_download, inputs=[user_token_input], outputs=[]) | |
return demo | |
# --- Main Execution Block (Your original, stable code) --- | |
if __name__ == "__main__": | |
logger.info("--- Initializing ZOTHEOS Gradio Interface (V13 - Ancestral Tech) ---") | |
zotheos_interface = build_interface(logo_path_verified, favicon_path_verified) | |
logger.info("β ZOTHEOS Gradio UI built.") | |
if initialization_error: print(f"βΌοΈ WARNING: ZOTHEOS AI System failed to initialize fully: {initialization_error}") | |
elif not ai_system: print(f"βΌοΈ WARNING: ZOTHEOS AI System object is None. Backend will not function.") | |
else: print("β ZOTHEOS AI System appears initialized.") | |
launch_kwargs = {"server_name": "0.0.0.0", "server_port": int(os.getenv("PORT", 7860)), "share": os.getenv("GRADIO_SHARE", "False").lower() == "true"} | |
zotheos_interface.queue().launch(**launch_kwargs) |