Spaces:
Running
Running
import atexit | |
import os | |
import signal | |
import socket | |
import subprocess | |
import sys | |
import time | |
from pathlib import Path | |
import gradio as gr | |
from smolagents.agents import MultiStepAgent | |
from .ui_helpers import stream_to_gradio | |
from .utils import load_file | |
preview_process = None | |
PREVIEW_PORT = 7861 # Internal port for preview apps | |
last_restart_time = 0 # Track when we last restarted the preview app | |
RESTART_COOLDOWN = 10 # Minimum seconds between restarts | |
def cleanup_preview_on_exit(): | |
"""Cleanup function called on program exit.""" | |
print("π§Ή Cleaning up preview app on exit...") | |
stop_preview_app() | |
def signal_handler(signum, frame): | |
"""Handle shutdown signals gracefully.""" | |
print(f"π Received signal {signum}, shutting down gracefully...") | |
cleanup_preview_on_exit() | |
sys.exit(0) | |
# Register signal handlers and exit handler | |
signal.signal(signal.SIGTERM, signal_handler) | |
signal.signal(signal.SIGINT, signal_handler) | |
atexit.register(cleanup_preview_on_exit) | |
def find_free_port(start_port=7860, max_ports=100): | |
"""Find an available TCP port, starting from a given port.""" | |
for port in range(start_port, start_port + max_ports): | |
try: | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind(("127.0.0.1", port)) | |
return port | |
except OSError: | |
print(f" Port {port} is in use, trying next...") | |
return None | |
def get_preview_url(): | |
"""Get the appropriate preview URL with a cache-busting timestamp.""" | |
# Append a timestamp as a query parameter to break browser cache | |
return f"/preview/?_t={int(time.time() * 1000)}" | |
def is_port_available(port, host="0.0.0.0"): | |
"""Check if a port is available for binding.""" | |
try: | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind((host, port)) | |
return True | |
except OSError: | |
return False | |
def find_app_py_in_sandbox(): | |
"""Find app.py file in sandbox folder and its subfolders.""" | |
sandbox_path = Path("sandbox") | |
if not sandbox_path.exists(): | |
return None | |
# Search for app.py files recursively | |
app_files = list(sandbox_path.rglob("app.py")) | |
if not app_files: | |
return None | |
# If multiple app.py files exist, throw an error | |
if len(app_files) > 1: | |
raise ValueError("Multiple app.py files found in sandbox directory") | |
return str(app_files[0]) | |
def save_file(path, new_text): | |
if path is None: | |
gr.Warning("β οΈ No file selected.") | |
try: | |
with open(path, "w", encoding="utf-8") as f: | |
f.write(new_text) | |
gr.Info(f"β Saved to: {path.split('sandbox/')[-1]}") | |
except Exception as e: | |
gr.Error(f"β Error saving: {e}") | |
def stop_preview_app(): | |
"""Stop the preview app subprocess if it's running.""" | |
global preview_process | |
if preview_process and preview_process.poll() is None: | |
print(f"π Stopping preview app process (PID: {preview_process.pid})...") | |
try: | |
preview_process.terminate() | |
preview_process.wait(timeout=5) | |
print("β Preview app stopped gracefully.") | |
except subprocess.TimeoutExpired: | |
preview_process.kill() | |
# Wait a bit longer for the kill to take effect | |
try: | |
preview_process.wait(timeout=2) | |
print("β οΈ Preview app force-killed after timeout.") | |
except subprocess.TimeoutExpired: | |
print("β οΈ Preview app may still be running after force-kill attempt.") | |
except Exception as e: | |
print(f"β Error stopping preview app: {e}") | |
finally: | |
preview_process = None | |
def start_preview_app(): | |
"""Start the preview app in a subprocess if it's not already running.""" | |
global preview_process, last_restart_time | |
# Check if preview app is already running and healthy | |
if preview_process and preview_process.poll() is None: | |
# Verify it's actually responsive on the port | |
try: | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
sock.settimeout(1) | |
result = sock.connect_ex(("127.0.0.1", PREVIEW_PORT)) | |
if result == 0: | |
print( | |
f"β Preview app already running and healthy " | |
f"(PID: {preview_process.pid})" | |
) | |
# We don't need to return the URL here anymore | |
return True, "Preview is running" | |
except Exception: | |
pass | |
# Check cooldown period to avoid too frequent restarts | |
current_time = time.time() | |
if current_time - last_restart_time < RESTART_COOLDOWN: | |
remaining_cooldown = RESTART_COOLDOWN - (current_time - last_restart_time) | |
print( | |
f"β³ Preview app restart on cooldown, {remaining_cooldown:.1f}s remaining" | |
) | |
if preview_process and preview_process.poll() is None: | |
# If there's still a process running, return success | |
return True, "Preview is running" | |
else: | |
return ( | |
False, | |
f"Preview app on cooldown for {remaining_cooldown:.1f} more seconds", | |
) | |
# Stop any existing process before starting a new one | |
stop_preview_app() | |
# Update restart time | |
last_restart_time = current_time | |
# Wait for the port to become available (up to 5 seconds) | |
for i in range(10): # 10 attempts * 0.5 seconds = 5 seconds max | |
if is_port_available(PREVIEW_PORT): | |
print(f"β Port {PREVIEW_PORT} is available") | |
break | |
print(f"β³ Port {PREVIEW_PORT} still busy, waiting... (attempt {i + 1}/10)") | |
time.sleep(0.5) | |
else: | |
print(f"β Port {PREVIEW_PORT} is still not available after 5 seconds") | |
return False, f"Port {PREVIEW_PORT} is not available" | |
app_file = find_app_py_in_sandbox() | |
if not app_file: | |
return False, "No `app.py` found in the `sandbox` directory." | |
print(f"π Starting preview app from `{app_file}` on port {PREVIEW_PORT}...") | |
try: | |
# Change to the directory containing the app file | |
app_dir = str(Path(app_file).parent) | |
preview_process = subprocess.Popen( | |
[ | |
"python", | |
"app.py", | |
"--server-port", | |
str(PREVIEW_PORT), | |
"--server-name", | |
"0.0.0.0", | |
"--root-path", | |
"/preview", | |
], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
cwd=app_dir, # Set working directory to the app directory | |
) | |
# Give it a moment to start up | |
time.sleep(3) | |
# Check if process is still running | |
if preview_process.poll() is None: | |
print(f"β Preview app started successfully (PID: {preview_process.pid}).") | |
# Additional check: verify the process is actually listening on the port | |
time.sleep(2) # Give it a bit more time to fully initialize | |
if preview_process.poll() is None: | |
# Check if port is actually being used (reverse of availability check) | |
try: | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
sock.settimeout(1) | |
result = sock.connect_ex(("127.0.0.1", PREVIEW_PORT)) | |
if result == 0: | |
print( | |
f"β Preview app is accepting connections on port " | |
f"{PREVIEW_PORT}" | |
) | |
return True, "Preview is running" | |
else: | |
print( | |
f"β Preview app started but not accepting connections " | |
f"on port {PREVIEW_PORT}" | |
) | |
# Get error output | |
try: | |
stdout, stderr = preview_process.communicate(timeout=1) | |
error_msg = f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}" | |
print(f"Process output: {error_msg}") | |
except subprocess.TimeoutExpired: | |
print("Process still running but not responsive") | |
return False, "Preview app not accepting connections" | |
except Exception as e: | |
print(f"β Error checking port connection: {e}") | |
return False, f"Error verifying connection: {e}" | |
else: | |
stdout, stderr = preview_process.communicate() | |
error_msg = ( | |
f"Process exited during initialization. " | |
f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}" | |
) | |
print(f"β {error_msg}") | |
return False, f"Preview app crashed during startup:\n{error_msg}" | |
else: | |
stdout, stderr = preview_process.communicate() | |
error_msg = f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}" | |
print(f"β Failed to start preview app. Error:\n{error_msg}") | |
return False, f"Failed to start preview app:\n{error_msg}" | |
except Exception as e: | |
print(f"β Exception while starting preview app: {e}") | |
return False, f"Error starting preview app: {e}" | |
def create_iframe_preview(): | |
"""Create an iframe that loads the sandbox app.""" | |
print("π create_iframe_preview() called") | |
# First, check if existing process is healthy | |
if preview_process is not None: | |
healthy, status = check_preview_health() | |
print(f"π Health check: {status}") | |
if healthy: | |
print("β Preview app is healthy, using existing process") | |
iframe_html = ( | |
f'<iframe src="{get_preview_url()}" ' | |
'width="100%" height="715px"></iframe>' | |
) | |
return iframe_html | |
else: | |
print(f"β οΈ Preview app unhealthy: {status}, attempting restart...") | |
else: | |
print("π No preview process exists, starting new one") | |
# Try to start the preview app and show an iframe | |
success, message = start_preview_app() | |
print(f"π start_preview_app() result: success={success}, message={message}") | |
if success: | |
iframe_html = ( | |
f'<iframe src="{get_preview_url()}" width="100%" height="715px"></iframe>' | |
) | |
return iframe_html | |
else: | |
# Show a more user-friendly error message with retry option | |
error_html = f""" | |
<div style="color: #d32f2f; padding: 20px; text-align: center; | |
border: 1px solid #d32f2f; border-radius: 8px; | |
background: #ffebee;"> | |
<h3 style="color: #d32f2f;">π§ Preview App Temporarily Unavailable</h3> | |
<p style="color: #333333;"><strong>Status:</strong> {message}</p> | |
<p style="color: #333333;"> | |
The preview app is starting up. Please wait a few seconds | |
and try refreshing. | |
</p> | |
<button onclick="location.reload()" style=" | |
background: #1976d2; color: white; border: none; | |
padding: 8px 16px; border-radius: 4px; cursor: pointer;"> | |
Refresh Preview | |
</button> | |
</div> | |
""" | |
print(f"π Error in preview: {message}") | |
return error_html | |
def is_preview_running(): | |
"""Check if the preview app is running and accessible.""" | |
global preview_process | |
# First check if process exists | |
if preview_process is None or preview_process.poll() is not None: | |
return False | |
# Then check if it's actually responsive on the port | |
try: | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
sock.settimeout(1) | |
result = sock.connect_ex(("127.0.0.1", PREVIEW_PORT)) | |
return result == 0 | |
except Exception: | |
return False | |
def check_preview_health(): | |
"""Check if the preview app is healthy and restart if needed.""" | |
global preview_process | |
if preview_process is None: | |
return False, "No preview process" | |
if preview_process.poll() is not None: | |
# Process has exited | |
try: | |
stdout, stderr = preview_process.communicate() | |
error_msg = f"Process exited. STDOUT:\n{stdout}\nSTDERR:\n{stderr}" | |
print(f"π¨ Preview process died: {error_msg}") | |
except Exception as e: | |
print(f"π¨ Preview process died: {e}") | |
preview_process = None | |
return False, "Process died" | |
# Check if responsive with multiple attempts and longer timeout | |
max_attempts = 3 | |
for attempt in range(max_attempts): | |
try: | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
sock.settimeout(3) # Increased timeout from 1 to 3 seconds | |
result = sock.connect_ex(("127.0.0.1", PREVIEW_PORT)) | |
if result == 0: | |
return True, "Healthy" | |
else: | |
if attempt < max_attempts - 1: | |
print( | |
f"π Health check attempt {attempt + 1}/" | |
f"{max_attempts} failed, retrying..." | |
) | |
time.sleep(1) # Wait before retrying | |
else: | |
return False, "Not responsive on port after multiple attempts" | |
except Exception as e: | |
if attempt < max_attempts - 1: | |
print( | |
f"π Health check attempt {attempt + 1}/" | |
f"{max_attempts} failed with error: {e}, retrying..." | |
) | |
time.sleep(1) | |
else: | |
return False, f"Connection check failed: {e}" | |
return False, "Health check failed" | |
def ensure_preview_running(): | |
"""Ensure the preview app is running, start it if needed.""" | |
if not is_preview_running(): | |
start_preview_app() | |
def get_default_model_for_provider(provider: str) -> str: | |
"""Get the default model ID for a given provider.""" | |
provider_model_map = { | |
"Anthropic": "anthropic/claude-sonnet-4-20250514", | |
"OpenAI": "openai/gpt-4.1", | |
"Mistral": "mistral/devstral-small-latest", | |
# "SambaNova": "sambanova/Qwen3-32B", | |
"Hugging Face": "huggingface/together/Qwen/Qwen2.5-Coder-32B-Instruct", | |
"OpenRouter": "openrouter/deepseek/deepseek-chat-v3-0324:free", | |
} | |
return provider_model_map.get( | |
provider, "huggingface/together/Qwen/Qwen2.5-Coder-32B-Instruct" | |
) | |
def get_available_providers(): | |
"""Detect which API keys are available in environment variables.""" | |
env_vars = { | |
"Anthropic": "ANTHROPIC_API_KEY", | |
"OpenAI": "OPENAI_API_KEY", | |
"Hugging Face": "HUGGINGFACE_API_KEY", | |
# "SambaNova": "SAMBANOVA_API_KEY", | |
"Mistral": "MISTRAL_API_KEY", | |
"OpenRouter": "OPENROUTER_API_KEY", | |
} | |
available_providers = [] | |
for provider, env_var in env_vars.items(): | |
if os.getenv(env_var): | |
available_providers.append(provider) | |
return available_providers | |
def get_default_provider(): | |
"""Get the default provider based on available API keys.""" | |
available = get_available_providers() | |
# Priority order for default selection | |
priority_order = ["Anthropic", "OpenAI", "Mistral", "OpenRouter", "Hugging Face"] | |
for provider in priority_order: | |
if provider in available: | |
return provider | |
# If no keys are available, default to Anthropic | |
return "Anthropic" | |
def initialize_model_from_environment(): | |
"""Initialize the model configuration based on available API keys in environment.""" | |
default_provider = get_default_provider() | |
available_providers = get_available_providers() | |
current_model_id = os.getenv("MODEL_ID", "") | |
print(f"π Available providers: {available_providers}") | |
print(f"π Default provider: {default_provider}") | |
print(f"π Current MODEL_ID: {current_model_id if current_model_id else 'NOT_SET'}") | |
# If we have available providers, check if current model matches the | |
# default provider | |
if available_providers: | |
expected_model = get_default_model_for_provider(default_provider) | |
# If MODEL_ID doesn't match the expected model for available provider, | |
# override it | |
if current_model_id != expected_model: | |
print( | |
f"π§ MODEL_ID mismatch: expected {expected_model} for " | |
f"{default_provider}, got {current_model_id}" | |
) | |
print("π§ Configuring based on available providers...") | |
# Set the model for the default provider | |
if default_provider != "Hugging Face": | |
env_var_map = { | |
"Anthropic": "ANTHROPIC_API_KEY", | |
"OpenAI": "OPENAI_API_KEY", | |
"SambaNova": "SAMBANOVA_API_KEY", | |
"Mistral": "MISTRAL_API_KEY", | |
"OpenRouter": "OPENROUTER_API_KEY", | |
} | |
env_var_name = env_var_map.get(default_provider) | |
api_key = os.getenv(env_var_name) | |
print( | |
f"π§ Checking {default_provider} with key: " | |
f"{'SET' if api_key else 'NOT_SET'}" | |
) | |
if api_key: | |
os.environ["API_KEY"] = api_key | |
os.environ["MODEL_ID"] = expected_model | |
print( | |
f"π§ Auto-configured model: {default_provider}" | |
f"-> {expected_model}" | |
) | |
print("π§ Set API_KEY and MODEL_ID environment variables") | |
return True | |
else: | |
print(f"β No API key found for {default_provider}") | |
else: | |
print("π§ Default provider is Hugging Face, keeping current model") | |
else: | |
print(f"β MODEL_ID matches expected model for {default_provider}") | |
else: | |
print("βΉοΈ No available providers detected, keeping current model") | |
return False | |
def save_api_key(provider, api_key): | |
"""Save API key to environment variable and update model accordingly.""" | |
if not api_key.strip(): | |
return f"β οΈ Please enter a valid API key for {provider}" | |
# Map provider names to environment variable names | |
env_var_map = { | |
"Anthropic": "ANTHROPIC_API_KEY", | |
"OpenAI": "OPENAI_API_KEY", | |
"Hugging Face": "HUGGINGFACE_API_KEY", | |
"SambaNova": "SAMBANOVA_API_KEY", | |
"Mistral": "MISTRAL_API_KEY", | |
"OpenRouter": "OPENROUTER_API_KEY", | |
} | |
env_var_name = env_var_map.get(provider) | |
if env_var_name: | |
# Always set the provider-specific API key | |
os.environ[env_var_name] = api_key.strip() | |
# For non-Hugging Face providers, also set the generic API_KEY and MODEL_ID | |
# This ensures the main agent uses the correct model and API key | |
if provider != "Hugging Face": | |
os.environ["API_KEY"] = api_key.strip() | |
os.environ["MODEL_ID"] = get_default_model_for_provider(provider) | |
return ( | |
f"β {provider} API key saved successfully \n" | |
f"Model: {get_default_model_for_provider(provider)}" | |
) | |
else: | |
return f"β {provider} API key saved successfully" | |
else: | |
return f"β Unknown provider: {provider}" | |
def get_api_key_status(selected_llm_provider=None): | |
"""Get the status of all API keys, highlighting the selected provider.""" | |
if selected_llm_provider is None: | |
selected_llm_provider = get_default_provider() | |
env_vars = { | |
"Hugging Face": "HUGGINGFACE_API_KEY", | |
"Anthropic": "ANTHROPIC_API_KEY", | |
"OpenAI": "OPENAI_API_KEY", | |
"SambaNova": "SAMBANOVA_API_KEY", | |
"Mistral": "MISTRAL_API_KEY", | |
"OpenRouter": "OPENROUTER_API_KEY", | |
} | |
status = [] | |
available_providers = get_available_providers() | |
# Show status for all providers | |
for provider, env_var in env_vars.items(): | |
if os.getenv(env_var): | |
key = os.getenv(env_var) | |
masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***" | |
model = get_default_model_for_provider(provider) | |
# Highlight the selected provider | |
if provider == selected_llm_provider: | |
status.append(f"π― {provider}: {masked_key} (Model: {model}) [ACTIVE]") | |
else: | |
status.append(f"β {provider}: {masked_key} (Model: {model})") | |
else: | |
model = get_default_model_for_provider(provider) | |
if provider == selected_llm_provider: | |
status.append(f"β {provider}: Not set (Would use: {model}) [SELECTED]") | |
else: | |
status.append(f"β {provider}: Not set (Would use: {model})") | |
# Show current active model | |
current_model = os.getenv("MODEL_ID", "Qwen/Qwen2.5-Coder-32B-Instruct") | |
status.append(f"π€ Current Active Model: {current_model}") | |
# Show summary | |
if available_providers: | |
status.append(f"π Available providers: {', '.join(available_providers)}") | |
else: | |
status.append("β οΈ No API keys detected in environment") | |
return "\n".join(status) | |
class GradioUI: | |
"""A one-line interface to launch your agent in Gradio""" | |
def __init__(self, agent: MultiStepAgent): | |
self.agent = agent | |
self.parent_id = None | |
def interact_with_agent(self, prompt, messages, session_state): | |
import gradio as gr | |
self.parent_id = int(time.time() * 1000) | |
# Get the agent type from the template agent | |
if "agent" not in session_state: | |
session_state["agent"] = self.agent | |
try: | |
messages.append( | |
gr.ChatMessage(role="user", content=prompt, metadata={"status": "done"}) | |
) | |
messages.append( | |
gr.ChatMessage( | |
role="assistant", | |
content="", | |
metadata={ | |
"id": self.parent_id, | |
"title": "π§ Thinking...", | |
"status": "pending", | |
}, | |
) | |
) | |
start_time = time.time() | |
yield messages | |
for msg in stream_to_gradio( | |
session_state["agent"], | |
task=prompt, | |
reset_agent_memory=False, | |
parent_id=self.parent_id, | |
): | |
if isinstance(msg, gr.ChatMessage): | |
messages.append(msg) | |
messages[-1].metadata["status"] = "done" | |
if msg.content.startswith("**Final answer:**"): | |
# Remove "**Final answer:**" prefix from the message content | |
if msg.content.startswith("**Final answer:**"): | |
msg.content = msg.content.replace("**Final answer:**\n", "") | |
# Set the parent message status to done when final | |
# answer is reached | |
for message in messages: | |
if ( | |
isinstance(message, gr.ChatMessage) | |
and message.metadata.get("id") == self.parent_id | |
): | |
message.metadata["status"] = "done" | |
message.metadata["title"] = ( | |
f"π§ Thought for {time.time() - start_time:.0f} " | |
"sec." | |
) | |
break | |
elif isinstance(msg, str): # Then it's only a completion delta | |
msg = msg.replace("<", r"\<").replace( | |
">", r"\>" | |
) # HTML tags seem to break Gradio Chatbot | |
if messages[-1].metadata["status"] == "pending": | |
messages[-1].content = msg | |
else: | |
messages.append( | |
gr.ChatMessage( | |
role="assistant", | |
content=msg, | |
metadata={"status": "pending"}, | |
) | |
) | |
yield messages | |
yield messages | |
except Exception as e: | |
yield messages | |
raise gr.Error(f"Error in interaction: {str(e)}") from e | |
def log_user_message(self, text_input, file_uploads_log): | |
import gradio as gr | |
return ( | |
text_input | |
+ ( | |
f"\nYou have been provided with these files, which might be " | |
f"helpful or not: {file_uploads_log}" | |
if len(file_uploads_log) > 0 | |
else "" | |
), | |
"", | |
gr.Button(interactive=False), | |
) | |
def launch(self, share: bool = True, **kwargs): | |
self.create_app().launch( | |
debug=True, | |
share=share, | |
**kwargs, | |
) | |
def create_app(self): | |
import gradio as gr | |
with gr.Blocks( | |
title="πLikable", | |
theme=gr.themes.Soft(), | |
fill_height=True, | |
fill_width=True, | |
analytics_enabled=False, | |
) as demo: | |
gr.Markdown("""# πLikable | |
## β οΈ IMPORTANT | |
**Please note:** This public demo space is shared among all users - everyone sees the same app and shares the same API key. For this reason, we've disabled the settings tab and are using a free version of DeepSeek V3 from OpenRouter. | |
**For private use or if you encounter rate limits**, we recommend duplicating this space to your own account and configuring it with your personal API keys, by either using Space sectres or uncommenting line 742-784 and 806-817 in `src/app.py` to enable the settings tab. | |
""") | |
with gr.Row(elem_classes="main-container"): | |
# Left side - Chat Interface | |
with gr.Column(scale=1, elem_classes="chat-container"): | |
avatar_url = ( | |
"http://em-content.zobj.net/source/apple/419/" | |
"growing-heart_1f497.png" | |
) | |
chatbot = gr.Chatbot( | |
avatar_images=(None, avatar_url), | |
type="messages", | |
resizable=True, | |
height=720, | |
) | |
with gr.Column(): | |
text_input = gr.Textbox( | |
placeholder="Ask Likable...", | |
scale=4, | |
container=False, | |
) | |
submit_btn = gr.Button("β", size="sm", variant="primary") | |
# Right side - Preview/Code/Settings Toggle | |
with gr.Column(scale=4, elem_classes="preview-container"): | |
with gr.Tab("Preview"): | |
preview_html = gr.HTML( | |
value=( | |
f'<iframe src="{get_preview_url()}" ' | |
'width="100%" height="715px"></iframe>' | |
), | |
elem_id="preview-container", | |
) | |
with gr.Tab("Code"): | |
with gr.Row(): | |
save_btn = gr.Button("Save", size="sm") | |
with gr.Row(equal_height=True): | |
file_explorer = gr.FileExplorer( | |
scale=1, | |
file_count="single", | |
value="app.py", | |
root_dir="sandbox", | |
) | |
code_editor = gr.Code( | |
scale=3, | |
value=load_file("sandbox/app.py"), | |
language="python", | |
visible=True, | |
interactive=True, | |
autocomplete=True, | |
max_lines=39, | |
) | |
# with gr.Tab("Settings"): | |
# gr.Markdown("## π API Keys") | |
# gr.Markdown( | |
# "Configure your API keys for different AI providers:" | |
# ) | |
# # API Key Status Display | |
# api_status = gr.Textbox( | |
# label="Current API Key Status", | |
# value=get_api_key_status(), | |
# interactive=False, | |
# lines=6, | |
# max_lines=8, | |
# ) | |
# gr.Markdown("---") | |
# # LLM Token with Provider Selection (now includes Hugging Face) | |
# with gr.Row(): | |
# llm_provider = gr.Dropdown( | |
# label="LLM Provider", | |
# choices=[ | |
# "Anthropic", | |
# "OpenAI", | |
# "Mistral", | |
# "SambaNova", | |
# "Hugging Face", | |
# ], | |
# value=get_default_provider(), | |
# scale=1, | |
# ) | |
# llm_token = gr.Textbox( | |
# label="API Key", | |
# placeholder="Enter your API key...", | |
# type="password", | |
# scale=3, | |
# ) | |
# llm_save_btn = gr.Button("Save", size="sm", scale=1) | |
# # Status message for API key operations | |
# api_message = gr.Textbox( | |
# label="Status", interactive=False, visible=False | |
# ) | |
# Add session state to store session-specific data | |
session_state = gr.State({}) | |
stored_messages = gr.State([]) | |
file_uploads_log = gr.State([]) | |
# Set up event handlers for API key saving | |
def save_and_update_status(provider, api_key, session_state=None): | |
message = save_api_key(provider, api_key) | |
status = get_api_key_status(provider) | |
# For non-Hugging Face providers, recreate the agent | |
if provider != "Hugging Face" and session_state is not None: | |
agent_message = self.recreate_agent_with_new_model( | |
session_state, provider | |
) | |
if agent_message: | |
message += f"\n{agent_message}" | |
return message, status, "" # Clear the input field | |
# llm_save_btn.click( | |
# lambda provider, key, sess_state: save_and_update_status( | |
# provider, key, sess_state | |
# ), | |
# inputs=[llm_provider, llm_token, session_state], | |
# outputs=[api_message, api_status, llm_token], | |
# ).then(lambda: gr.Textbox(visible=True), outputs=[api_message]) | |
# # Update status when LLM provider dropdown changes | |
# llm_provider.change( | |
# fn=get_api_key_status, inputs=[llm_provider], outputs=[api_status] | |
# ) | |
# Set up event handlers | |
file_explorer.change( | |
fn=load_file, inputs=file_explorer, outputs=code_editor | |
) | |
def refresh_all_with_preview_restart(): | |
"""Refresh everything including forcing a preview app restart | |
to pick up code changes.""" | |
print("π Forcing preview app restart to pick up code changes...") | |
# Force stop the current preview app to pick up code changes | |
stop_preview_app() | |
# Start fresh with new code | |
current_preview = create_iframe_preview() | |
# Update the file explorer and code editor | |
file_explorer_val = gr.FileExplorer( | |
scale=1, | |
file_count="single", | |
value="app.py", | |
root_dir="sandbox", | |
) | |
code_editor_val = gr.Code( | |
scale=3, | |
value=load_file("sandbox/app.py"), | |
language="python", | |
visible=True, | |
interactive=True, | |
autocomplete=True, | |
) | |
return file_explorer_val, code_editor_val, current_preview | |
def refresh_all(): | |
# Only refresh preview if it's not currently healthy | |
current_preview = None | |
if preview_process is not None: | |
healthy, status = check_preview_health() | |
if healthy: | |
# Preview is healthy, just return existing iframe | |
current_preview = ( | |
f'<iframe src="{get_preview_url()}" ' | |
'width="100%" height="715px"></iframe>' | |
) | |
else: | |
# Preview needs refresh | |
current_preview = create_iframe_preview() | |
else: | |
# No preview process, create one | |
current_preview = create_iframe_preview() | |
# Then, update the file explorer and code editor | |
file_explorer_val = gr.FileExplorer( | |
scale=1, | |
file_count="single", | |
value="app.py", | |
root_dir="sandbox", | |
) | |
code_editor_val = gr.Code( | |
scale=3, | |
value=load_file("sandbox/app.py"), | |
language="python", | |
visible=True, | |
interactive=True, | |
autocomplete=True, | |
) | |
return file_explorer_val, code_editor_val, current_preview | |
save_btn.click( | |
fn=save_file, | |
inputs=[file_explorer, code_editor], | |
).then( | |
fn=refresh_all_with_preview_restart, | |
outputs=[file_explorer, code_editor, preview_html], | |
) | |
text_input.submit( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, submit_btn], | |
).then( | |
self.interact_with_agent, | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
fn=refresh_all_with_preview_restart, | |
outputs=[file_explorer, code_editor, preview_html], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Ask Likable...", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, submit_btn], | |
) | |
submit_btn.click( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, submit_btn], | |
).then( | |
self.interact_with_agent, | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
fn=refresh_all_with_preview_restart, | |
outputs=[file_explorer, code_editor, preview_html], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Ask Likable....", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, submit_btn], | |
) | |
# Load the preview iframe when the app starts | |
demo.load(fn=create_iframe_preview, outputs=[preview_html]) | |
# Note: Removed demo.unload(stop_preview_app) as it was causing | |
# preview app restarts on every page reload, leading to 502 errors. | |
# We have proper cleanup via signal handlers and atexit handlers. | |
return demo | |
def recreate_agent_with_new_model(self, session_state, provider=None): | |
"""Recreate the agent with updated model configuration.""" | |
from .kiss_agent import KISSAgent | |
# Get the new model ID if provider is specified | |
if provider and provider != "Hugging Face": | |
model_id = get_default_model_for_provider(provider) | |
# Get API key from provider-specific environment variable | |
env_var_map = { | |
"Anthropic": "ANTHROPIC_API_KEY", | |
"OpenAI": "OPENAI_API_KEY", | |
"SambaNova": "SAMBANOVA_API_KEY", | |
"Mistral": "MISTRAL_API_KEY", | |
"OpenRouter": "OPENROUTER_API_KEY", | |
} | |
env_var_name = env_var_map.get(provider) | |
api_key = os.getenv(env_var_name) if env_var_name else None | |
if not api_key: | |
return f"β No API key found for {provider}" | |
# Create new agent with updated model | |
new_agent = KISSAgent(model_id=model_id, api_key=api_key) | |
session_state["agent"] = new_agent | |
return f"π Agent updated to use {provider} model: {model_id}" | |
return "" | |
if __name__ == "__main__": | |
import sys | |
from .kiss_agent import KISSAgent | |
# Initialize model configuration based on available API keys | |
print("π Checking for available API keys...") | |
initialize_model_from_environment() | |
# Create agent with explicit parameters from the updated environment | |
model_id = os.getenv("MODEL_ID", "Qwen/Qwen2.5-Coder-32B-Instruct") | |
api_key = os.getenv("API_KEY") | |
print(f"π€ Creating agent with model: {model_id}") | |
print(f"π Using API key: {'SET' if api_key else 'NOT_SET'}") | |
agent = KISSAgent(model_id=model_id, api_key=api_key) | |
# Start the preview app automatically when the main app starts | |
print("π Starting preview app automatically...") | |
success, message = start_preview_app() | |
if success: | |
print(f"β Preview app started: {message}") | |
else: | |
print(f"β Failed to start preview app: {message}") | |
# Parse command line arguments for server configuration | |
server_port_arg = 7860 # default | |
server_name = "127.0.0.1" # default | |
if "--server-port" in sys.argv: | |
port_idx = sys.argv.index("--server-port") | |
if port_idx + 1 < len(sys.argv): | |
server_port_arg = int(sys.argv[port_idx + 1]) | |
if "--server-name" in sys.argv: | |
name_idx = sys.argv.index("--server-name") | |
if name_idx + 1 < len(sys.argv): | |
server_name = sys.argv[name_idx + 1] | |
# Find an available port for the main app, starting with the desired one | |
server_port = find_free_port(server_port_arg) | |
if server_port is None: | |
print(f"β Could not find any available ports starting from {server_port_arg}") | |
sys.exit(1) | |
if server_port != server_port_arg: | |
print(f"β οΈ Port {server_port_arg} was busy. Running on free port: {server_port}") | |
GradioUI(agent).launch( | |
share=False, server_port=server_port, server_name=server_name | |
) | |