diff --git "a/app.py" "b/app.py" new file mode 100644--- /dev/null +++ "b/app.py" @@ -0,0 +1,3165 @@ +import gradio as gr +import json +import sys +import os +import modal +import datetime +import base64 +from anthropic import Anthropic +import subprocess +import re +import time + +#prompts +modal_params = """# All possible @app.function() parameters +@app.function( + # Resource Configuration + cpu=float, # CPU cores (e.g., 4.0) + cpu=(float, float), # (request, limit) tuple + memory=int, # Memory in MB + memory=(int, int), # (request, limit) tuple + gpu="T4" | "L4" | "A10G" | "A100-40GB" | "A100-80GB" | "L40S" | "H100" | "H200" | "B200", + gpu="GPU_TYPE:count", # Multiple GPUs + gpu=["GPU_TYPE", "GPU_TYPE:count"], # GPU fallback list + ephemeral_disk=int, # Ephemeral disk in MB + + # Container Configuration + image=modal.Image, # Container image + secrets=[modal.Secret], # List of secrets + volumes={"/path": modal.Volume}, # Volume mounts + cloud_bucket_mounts={}, # Cloud storage mounts + + # Networking & Concurrency + allow_concurrent_inputs=int, # Concurrent requests per container + concurrency_limit=int, # Max concurrent containers + container_idle_timeout=int, # Idle timeout in seconds + timeout=int, # Function timeout in seconds + + # Scheduling & Retry + schedule=modal.Cron | modal.Period, # Scheduling + retries=int, # Retry attempts + retry=modal.Retry, # Custom retry policy + + # Environment & Runtime + environment=dict, # Environment variables + keep_warm=int, # Warm containers count + architecture="x86_64" | "arm64", # CPU architecture + cloud="aws" | "gcp", # Cloud provider + + # Batch Processing + batch_max_size=int, # Max batch size + + # Metadata + name=str, # Function name + is_generator=bool, # Generator function flag +) +""" + +modal_demo = """ +import modal + +app = modal.App("gradio-app") +web_image = modal.Image.debian_slim(python_version="3.12").pip_install( + "fastapi[standard]==0.115.4", + "gradio~=5.7.1", + "pillow~=10.2.0", +) + +@app.function( + image=web_image, + min_containers=1, + scaledown_window=60 * 20, + # gradio requires sticky sessions + # so we limit the number of concurrent containers to 1 + # and allow it to scale to 100 concurrent inputs + max_containers=1, +) +@modal.concurrent(max_inputs=100) +@modal.asgi_app() +def ui(): + + import gradio as gr + from fastapi import FastAPI + from gradio.routes import mount_gradio_app + import time + + def greet(name, intensity): + time.sleep(5) # Simulating processing time + return "Hello, " + name + "!" * int(intensity) + + demo = gr.Interface( + fn=greet, + inputs=["text", "slider"], + outputs=["text"], + ) + demo.queue(max_size=5) # Enable queue for handling multiple request + + return mount_gradio_app(app=FastAPI(), blocks=demo, path="/") +""" + +all_nodes = ''' +example workflow +{ + "workflow_id": "simple-chatbot-v1", + "workflow_name": "Simple Chatbot", + "nodes": [ + { + "id": "ChatInput-1", + "type": "ChatInput", + "data": { + "display_name": "User's Question", + "template": { + "input_value": { + "display_name": "Input", + "type": "string", + "value": "What is the capital of France?", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.1, + "memory": "128Mi", + "gpu": "none" + } + }, + { + "id": "Prompt-1", + "type": "Prompt", + "data": { + "display_name": "System Prompt", + "template": { + "prompt_template": { + "display_name": "Template", + "type": "string", + "value": "You are a helpful geography expert. The user asked: {input_value}", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.1, + "memory": "128Mi", + "gpu": "none" + } + }, + { + "id": "OpenAI-1", + "type": "OpenAIModel", + "data": { + "display_name": "OpenAI gpt-4o-mini", + "template": { + "model": { + "display_name": "Model", + "type": "options", + "options": ["gpt-4o", "gpt-4o-mini", "gpt-3.5-turbo"], + "value": "gpt-4o-mini" + }, + "api_key": { + "display_name": "API Key", + "type": "SecretStr", + "required": true, + "env_var": "OPENAI_API_KEY" + }, + "prompt": { + "display_name": "Prompt", + "type": "string", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.5, + "memory": "256Mi", + "gpu": "none" + } + }, + { + "id": "ChatOutput-1", + "type": "ChatOutput", + "data": { + "display_name": "Final Answer", + "template": { + "response": { + "display_name": "Response", + "type": "string", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.1, + "memory": "128Mi", + "gpu": "none" + } + } + ], + "edges": [ + { + "source": "ChatInput-1", + "source_handle": "input_value", + "target": "Prompt-1", + "target_handle": "prompt_template" + }, + { + "source": "Prompt-1", + "source_handle": "prompt_template", + "target": "OpenAI-1", + "target_handle": "prompt" + }, + { + "source": "OpenAI-1", + "source_handle": "response", + "target": "ChatOutput-1", + "target_handle": "response" + } + ] +} + + +## input node +{ + "id": "Input-1", + "type": "Input", + "data": { + "display_name": "Source Data", + "template": { + "data_type": { + "display_name": "Data Type", + "type": "options", + "options": ["string", "image", "video", "audio", "file"], + "value": "string" + }, + "value": { + "display_name": "Value or Path", + "type": "string", + "value": "This is the initial text." + }, + "data": { + "display_name": "Output Data", + "type": "object", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.1, + "memory": "128Mi", + "gpu": "none" + } +} + + +from typing import Any, Dict + +def process_input(data_type: str, value: Any) -> Dict[str, Any]: + Packages the source data and its type for downstream nodes. + """ + # The output is a dictionary containing both the type and the data/path. + # This gives the next node context on how to handle the value. + """ + output_package = { + "type": data_type, + "value": value + } + return {"data": output_package} + +process_input("string", "hi") + +## output node +"""{ + "id": "Output-1", + "type": "Output", + "data": { + "display_name": "Final Result", + "template": { + "input_data": { + "display_name": "Input Data", + "type": "object", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.1, + "memory": "128Mi", + "gpu": "none" + } +}""" + +from typing import Any, Dict + +def process_output(input_data: Dict[str, Any]) -> None: + """ + Receives the final data package and prints its contents. + """ + # Unpacks the dictionary received from the upstream node. + data_type = input_data.get("type", "unknown") + value = input_data.get("value", "No value provided") + + # print("--- Final Workflow Output ---") + # print(f" Data Type: {data_type}") + # print(f" Value/Path: {value}") + # print("-----------------------------") + + dont print output, just return it + +process_output({'type': 'string', 'value': 'hi'}) + +## api request node +"""{ + "id": "APIRequest-1", + "type": "APIRequest", + "data": { + "display_name": "Get User Data", + "template": { + "url": { + "display_name": "URL", + "type": "string", + "value": "https://api.example.com/users/1" + }, + "method": { + "display_name": "Method", + "type": "options", + "options": ["GET", "POST", "PUT", "DELETE"], + "value": "GET" + }, + "headers": { + "display_name": "Headers (JSON)", + "type": "string", + "value": "{\"Authorization\": \"Bearer YOUR_TOKEN\"}" + }, + "body": { + "display_name": "Request Body", + "type": "object", + "is_handle": true + }, + "response": { + "display_name": "Response Data", + "type": "object", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.2, + "memory": "256Mi", + "gpu": "none" + } +}""" + +import requests +import json +from typing import Any, Dict + +def process_api_request(url: str, method: str, headers: str, body: Dict = None) -> Dict[str, Any]: + """ + Performs an HTTP request and returns the JSON response. + """ + try: + parsed_headers = json.loads(headers) + except json.JSONDecodeError: + print("Warning: Headers are not valid JSON. Using empty headers.") + parsed_headers = {} + + try: + response = requests.request( + method=method, + url=url, + headers=parsed_headers, + json=body, + timeout=10 # 10-second timeout + ) + # Raise an exception for bad status codes (4xx or 5xx) + response.raise_for_status() + + # The output is a dictionary containing the JSON response. + return {"response": response.json()} + + except requests.exceptions.RequestException as e: + print(f"Error during API request: {e}") + # Return an error structure on failure + return {"response": {"error": str(e), "status_code": getattr(e.response, 'status_code', 500)}} + +url = "https://jsonplaceholder.typicode.com/posts" +method = "GET" +headers = "{}" # empty JSON headers +body = None # GET requests typically don't send a JSON body + +result = process_api_request(url, method, headers, body) +print(result) + +url = "https://jsonplaceholder.typicode.com/posts" +method = "POST" +headers = '{"Content-Type": "application/json"}' +body = { + "title": "foo", + "body": "bar", + "userId": 1 +} + +result = process_api_request(url, method, headers, body) +print(result) + +## react agent tool +import os +import asyncio +from typing import List, Dict, Any +from llama_index.core.agent import ReActAgent +from llama_index.core.tools import FunctionTool +from llama_index.llms.openai import OpenAI +from duckduckgo_search import DDGS + +# Set your API key +# os.environ["OPENAI_API_KEY"] = "your-api-key-here" + +class WorkflowReActAgent: + """Complete working ReAct Agent with your workflow tools""" + + def __init__(self, llm_model: str = "gpt-4o-mini"): + self.llm = OpenAI(model=llm_model, temperature=0.1) + self.tools = self._create_tools() + self.agent = ReActAgent.from_tools( + tools=self.tools, + llm=self.llm, + verbose=True, + max_iterations=8 # Reasonable limit + ) + + def _create_tools(self) -> List[FunctionTool]: + """Create tools that actually work and get used""" + + # ๐Ÿ” Web Search Tool (using your exact implementation) + def web_search(query: str) -> str: + """Search the web for current information""" + try: + with DDGS() as ddgs: + results = [] + gen = ddgs.text(query, safesearch="Off") + for i, result in enumerate(gen): + if i >= 3: # Limit results + break + results.append(f"โ€ข {result.get('title', '')}: {result.get('body', '')[:150]}...") + + if results: + return f"Search results: {'; '.join(results)}" + else: + return f"No results found for '{query}'" + + except Exception as e: + return f"Search error: {str(e)}" + + # ๐Ÿงฎ Calculator Tool + def calculate(expression: str) -> str: + """Calculate mathematical expressions safely""" + try: + # Simple and safe evaluation + allowed_chars = "0123456789+-*/().,_ " + if all(c in allowed_chars for c in expression): + result = eval(expression) + return f"Result: {result}" + else: + return f"Invalid expression: {expression}" + except Exception as e: + return f"Math error: {str(e)}" + + # ๐Ÿ Python Executor Tool + def execute_python(code: str) -> str: + """Execute Python code and return results""" + import sys + from io import StringIO + import traceback + + old_stdout = sys.stdout + sys.stdout = StringIO() + + try: + local_scope = {} + exec(code, {"__builtins__": __builtins__}, local_scope) + + output = sys.stdout.getvalue() + + # Get result from the last line if it's an expression + lines = code.strip().split('\n') + if lines: + try: + result = eval(lines[-1], {}, local_scope) + return f"Result: {result}\nOutput: {output}".strip() + except: + pass + + return f"Output: {output}".strip() if output else "Code executed successfully" + + except Exception as e: + return f"Error: {str(e)}" + finally: + sys.stdout = old_stdout + + # ๐ŸŒ API Request Tool + def api_request(url: str, method: str = "GET") -> str: + """Make HTTP API requests""" + import requests + try: + response = requests.request(method, url, timeout=10) + return f"Status: {response.status_code}\nResponse: {response.text[:300]}..." + except Exception as e: + return f"API error: {str(e)}" + + # Convert to FunctionTool objects + return [ + FunctionTool.from_defaults( + fn=web_search, + name="web_search", + description="Search the web for current information on any topic" + ), + FunctionTool.from_defaults( + fn=calculate, + name="calculate", + description="Calculate mathematical expressions and equations" + ), + FunctionTool.from_defaults( + fn=execute_python, + name="execute_python", + description="Execute Python code for data processing and calculations" + ), + FunctionTool.from_defaults( + fn=api_request, + name="api_request", + description="Make HTTP requests to APIs and web services" + ) + ] + + def chat(self, message: str) -> str: + """Chat with the ReAct agent""" + try: + response = self.agent.chat(message) + return str(response.response) + except Exception as e: + return f"Agent error: {str(e)}" + +# ๐Ÿš€ Usage Examples +def main(): + """Test the working ReAct agent""" + + agent = WorkflowReActAgent() + + test_queries = [ + "What's the current Bitcoin price and calculate 10% of it?", + "Search for news about SpaceX and tell me the latest", + "Calculate the compound interest: 1000 * (1.05)^10", + "Search for Python programming tips", + "What's 15 factorial divided by 12 factorial?", + "Find information about the latest iPhone and calculate its price in EUR if 1 USD = 0.92 EUR" + ] + + print("๐Ÿค– WorkflowReActAgent Ready!") + print("=" * 60) + + for i, query in enumerate(test_queries, 1): + print(f"\n๐Ÿ”ธ Query {i}: {query}") + print("-" * 50) + + response = agent.chat(query) + print(f"๐ŸŽฏ Response: {response}") + print("\n" + "="*60) + +if __name__ == "__main__": + main() + + +## web search tool +"""{ + "id": "WebSearch-1", + "type": "WebSearch", + "data": { + "display_name": "Search for News", + "template": { + "query": { + "display_name": "Search Query", + "type": "string", + "is_handle": true + }, + "results": { + "display_name": "Search Results", + "type": "object", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.2, + "memory": "256Mi", + "gpu": "none" + } +}""" + +# First, install duckduckgo_search: +# pip install duckduckgo_search + +import json +from typing import Any, Dict, List +from duckduckgo_search import DDGS + +def process_web_search(query: str, max_results: int = 10) -> Dict[str, Any]: + if not query: + return {"results": []} + + try: + # Use the DDGS client and its text() method + with DDGS() as ddgs: + gen = ddgs.text(query, safesearch="Off") + # Collect up to max_results items + results: List[Dict[str, str]] = [ + {"title": r.get("title", ""), "link": r.get("href", ""), "snippet": r.get("body", "")} + for _, r in zip(range(max_results), gen) + ] + return {"results": results} + + except Exception as e: + return {"results": {"error": str(e)}} + + +# import json +# from typing import Any +# from llama_index.tools import BaseTool, ToolMetadata + +# class DuckDuckGoSearchTool(BaseTool): +# """A LlamaIndex tool that proxies to process_web_search.""" +# metadata = ToolMetadata( +# name="duckduckgo_search", +# description="Performs a web search via DuckDuckGo and returns JSON results." +# ) + +# def __init__(self, max_results: int = 10): +# self.max_results = max_results + +# def _run(self, query: str) -> str: +# # Call our search function and return a JSON string +# results = process_web_search(query, max_results=self.max_results) +# return json.dumps(results) + +# async def _arun(self, query: str) -> str: +# # Async agents can await this +# results = process_web_search(query, max_results=self.max_results) +# return json.dumps(results) + +# from llama_index import GPTVectorStoreIndex, ServiceContext +# from llama_index.agent.react import ReactAgent +# from llama_index.tools import ToolConfig + +# # 1. Instantiate the tool +# search_tool = DuckDuckGoSearchTool(max_results=5) + +# # 2. Create an agent and register tools +# agent = ReactAgent( +# tools=[search_tool], +# service_context=ServiceContext.from_defaults() +# ) + +# # 3. Run the agent with a naturalโ€language prompt +# response = agent.run("What are the top news about renewable energy?") +# print(response) + + +process_web_search(query="devil may cry") + + +## execute python node +"""{ + "id": "ExecutePython-1", + "type": "ExecutePython", + "data": { + "display_name": "Custom Data Processing", + "template": { + "code": { + "display_name": "Python Code", + "type": "string", + "value": "def process(data):\n # Example: Extract titles from search results\n titles = [item['title'] for item in data]\n # The 'result' variable will be the output\n result = ', '.join(titles)\n return result" + }, + "input_vars": { + "display_name": "Input Variables", + "type": "object", + "is_handle": true + }, + "output_vars": { + "display_name": "Output Variables", + "type": "object", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.5, + "memory": "512Mi", + "gpu": "none" + } +}""" + +import sys +import traceback +from typing import Any, Dict + +def process_execute_python(code: str, input_vars: Dict[str, Any] = None) -> Dict[str, Any]: + """ + Executes a string of Python code within an isolated scope. + - If the code defines `process(data)`, calls it with `input_vars`. + - Otherwise, executes the code top-level and returns any printed output. + """ + if input_vars is None: + input_vars = {} + + # Capture stdout + from io import StringIO + old_stdout = sys.stdout + sys.stdout = StringIO() + + local_scope: Dict[str, Any] = {} + try: + # Execute user code + exec(code, {}, local_scope) + + if "process" in local_scope and callable(local_scope["process"]): + result = local_scope["process"](input_vars) + else: + # No process(): run as script + # (re-exec under a fresh namespace to capture prints) + exec(code, {}, {}) + result = None + + output = sys.stdout.getvalue() + return {"output_vars": result, "stdout": output} + + except Exception: + err = traceback.format_exc() + return {"output_vars": None, "error": err} + + finally: + sys.stdout = old_stdout + +# 1. Code with process(): +code1 = """ +def process(data): + return {"sum": data.get("x",0) + data.get("y",0)} +""" +print(process_execute_python(code1, {"x":5, "y":7})) +# โ†’ {'output_vars': {'sum': 12}, 'stdout': ''} + +# 2. Standalone code: +code2 = 'print("Hello, world!")' +print(process_execute_python(code2)) +# โ†’ {'output_vars': None, 'stdout': 'Hello, world!\n'} + +# import json +# from typing import Any +# from llama_index.tools import BaseTool, ToolMetadata + +# class ExecutePythonTool(BaseTool): +# """Executes arbitrary Python code strings in an isolated scope.""" +# metadata = ToolMetadata( +# name="execute_python", +# description="Runs user-supplied Python code. Requires optional `process(data)` or runs script." +# ) + +# def _run(self, code: str) -> str: +# # Call the executor and serialize the dict result +# result = process_execute_python(code) +# return json.dumps(result) + +# async def _arun(self, code: str) -> str: +# result = process_execute_python(code) +# return json.dumps(result) + +# from llama_index.agent.react import ReactAgent +# from llama_index import ServiceContext + +# tool = ExecutePythonTool() +# agent = ReactAgent(tools=[tool], service_context=ServiceContext.from_defaults()) + +# # Agent will call `execute_python` when needed. +# response = agent.run("Please run the Python code: print('Test')") +# print(response) + + +## conditional logix +"""{ + "id": "ConditionalLogic-1", + "type": "ConditionalLogic", + "data": { + "display_name": "Check User Role", + "template": { + "operator": { + "display_name": "Operator", + "type": "options", + "options": ["==", "!=", ">", "<", ">=", "<=", "contains", "not contains"], + "value": "==" + }, + "comparison_value": { + "display_name": "Comparison Value", + "type": "string", + "value": "admin" + }, + "input_value": { + "display_name": "Input to Check", + "type": "any", + "is_handle": true + }, + "true_output": { + "display_name": "Path if True", + "type": "any", + "is_handle": true + }, + "false_output": { + "display_name": "Path if False", + "type": "any", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.1, + "memory": "128Mi", + "gpu": "none" + } +}""" + +from typing import Any, Dict + +def process_conditional_logic(operator: str, comparison_value: str, input_value: Any) -> Dict[str, Any]: + """ + Evaluates a condition and returns the input value on the appropriate output handle. + """ + result = False + # Attempt to convert types for numeric comparison + try: + num_input = float(input_value) + num_comp = float(comparison_value) + except (ValueError, TypeError): + num_input, num_comp = None, None + + # Evaluate condition + if operator == '==' : result = input_value == comparison_value + elif operator == '!=': result = input_value != comparison_value + elif operator == '>' and num_input is not None: result = num_input > num_comp + elif operator == '<' and num_input is not None: result = num_input < num_comp + elif operator == '>=' and num_input is not None: result = num_input >= num_comp + elif operator == '<=' and num_input is not None: result = num_input <= num_comp + elif operator == 'contains': result = str(comparison_value) in str(input_value) + elif operator == 'not contains': result = str(comparison_value) not in str(input_value) + + # Return the input data on the correct output handle based on the result + if result: + # The key "true_output" matches the source_handle in the workflow edge + return {"true_output": input_value} + else: + # The key "false_output" matches the source_handle in the workflow edge + return {"false_output": input_value} + + ## wait node +"""{ + "id": "Wait-1", + "type": "Wait", + "data": { + "display_name": "Wait for 5 Seconds", + "template": { + "duration": { + "display_name": "Duration (seconds)", + "type": "number", + "value": 5 + }, + "passthrough_input": { + "display_name": "Passthrough Data In", + "type": "any", + "is_handle": true + }, + "passthrough_output": { + "display_name": "Passthrough Data Out", + "type": "any", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.1, + "memory": "128Mi", + "gpu": "none" + } +}""" + +import time +from typing import Any, Dict + +def process_wait(duration: int, passthrough_input: Any = None) -> Dict[str, Any]: + """ + Pauses execution for a given duration and then passes data through. + """ + time.sleep(duration) + # The output key "passthrough_output" matches the source_handle + return {"passthrough_output": passthrough_input} + +## chat node +"""{ + "id": "ChatModel-1", + "type": "ChatModel", + "data": { + "display_name": "AI Assistant", + "template": { + "provider": { + "display_name": "Provider", + "type": "options", + "options": ["OpenAI", "Anthropic"], + "value": "OpenAI" + }, + "model": { + "display_name": "Model Name", + "type": "string", + "value": "gpt-4o-mini" + }, + "api_key": { + "display_name": "API Key", + "type": "SecretStr", + "required": true, + "env_var": "OPENAI_API_KEY" + }, + "system_prompt": { + "display_name": "System Prompt (Optional)", + "type": "string", + "value": "You are a helpful assistant." + }, + "prompt": { + "display_name": "Prompt", + "type": "string", + "is_handle": true + }, + "response": { + "display_name": "Response", + "type": "string", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.5, + "memory": "256Mi", + "gpu": "none" + } +}""" + +import os +from typing import Any, Dict +from openai import OpenAI +from anthropic import Anthropic + +def process_chat_model(provider: str, model: str, api_key: str, prompt: str, system_prompt: str = "") -> Dict[str, Any]: + """ + Calls the specified chat model provider with a given prompt. + """ + response_text = "" + if provider == "OpenAI": + client = OpenAI(api_key=api_key) + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": prompt}) + + completion = client.chat.completions.create(model=model, messages=messages) + response_text = completion.choices[0].message.content + + elif provider == "Anthropic": + client = Anthropic(api_key=api_key) + message = client.messages.create( + model=model, + max_tokens=2048, + system=system_prompt, + messages=[{"role": "user", "content": prompt}] + ) + response_text = message.content[0].text + + return {"response": response_text} + + + +def test_openai(): + openai_key = os.getenv("OPENAI_API_KEY") + if not openai_key: + raise RuntimeError("Set the OPENAI_API_KEY environment variable.") + result = process_chat_model( + provider="OpenAI", + model="gpt-3.5-turbo", + api_key=openai_key, + system_prompt="You are a helpful assistant.", + prompt="What's the capital of France?" + ) + print("OpenAI response:", result["response"]) + + +def test_anthropic(): + anthropic_key = os.getenv("ANTHROPIC_API_KEY") + if not anthropic_key: + raise RuntimeError("Set the ANTHROPIC_API_KEY environment variable.") + result = process_chat_model( + provider="Anthropic", + model="claude-sonnet-4-20250514", + api_key=anthropic_key, + system_prompt="You are a concise assistant.", + prompt="List three benefits of renewable energy." + ) + print("Anthropic response:", result["response"]) + + +if __name__ == "__main__": + test_openai() + test_anthropic() + +## rag node 1 knowledge base +"""{ + "id": "KnowledgeBase-1", + "type": "KnowledgeBase", + "data": { + "display_name": "Create Product Docs KB", + "template": { + "kb_name": { + "display_name": "Knowledge Base Name", + "type": "string", + "value": "product-docs-v1" + }, + "source_type": { + "display_name": "Source Type", + "type": "options", + "options": ["Directory", "URL"], + "value": "URL" + }, + "path_or_url": { + "display_name": "Path or URL", + "type": "string", + "value": "https://docs.modal.com/get-started" + }, + "knowledge_base": { + "display_name": "Knowledge Base Out", + "type": "object", + "is_handle": true + } + } + }, + "resources": { + "cpu": 2.0, + "memory": "1Gi", + "gpu": "none" + } +}""" + +import os +from typing import Any, Dict +from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, Settings +from llama_index.readers.web import SimpleWebPageReader +from llama_index.embeddings.huggingface import HuggingFaceEmbedding + +def process_knowledge_base(kb_name: str, source_type: str, path_or_url: str) -> Dict[str, Any]: + """ + Creates and persists a LlamaIndex VectorStoreIndex. + """ + # Use a high-quality, local model for embeddings + Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") + + if source_type == "URL": + documents = SimpleWebPageReader(html_to_text=True).load_data([path_or_url]) + else: + documents = SimpleDirectoryReader(input_dir=path_or_url).load_data() + + index = VectorStoreIndex.from_documents(documents) + + storage_path = os.path.join("./storage", kb_name) + index.storage_context.persist(persist_dir=storage_path) + + # Return a reference object to the persisted index + return {"knowledge_base": {"name": kb_name, "path": storage_path}} + +## rag node 2 query +"""{ + "id": "RAGQuery-1", + "type": "RAGQuery", + "data": { + "display_name": "Retrieve & Augment Prompt", + "template": { + "query": { + "display_name": "Original Query", + "type": "string", + "is_handle": true + }, + "knowledge_base": { + "display_name": "Knowledge Base", + "type": "object", + "is_handle": true + }, + "rag_prompt": { + "display_name": "Augmented Prompt Out", + "type": "string", + "is_handle": true + } + } + }, + "resources": { + "cpu": 1.0, + "memory": "512Mi", + "gpu": "none" + } +}""" + +from typing import Any, Dict +from llama_index.core import StorageContext, load_index_from_storage, Settings +from llama_index.embeddings.huggingface import HuggingFaceEmbedding + +def process_rag_query(query: str, knowledge_base: Dict) -> Dict[str, Any]: + """ + Retrieves context from a knowledge base and creates an augmented prompt. + """ + Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") + + # Load the index from the path provided by the KnowledgeBase node + storage_context = StorageContext.from_defaults(persist_dir=knowledge_base['path']) + index = load_index_from_storage(storage_context) + + retriever = index.as_retriever(similarity_top_k=3) + retrieved_nodes = retriever.retrieve(query) + + # Combine the retrieved text into a single context block + context_str = "\n\n".join([node.get_content() for node in retrieved_nodes]) + + # Construct the final prompt for the ChatModel + rag_prompt_template = ( + "Use the following context to answer the question. " + "If the answer is not in the context, say you don't know.\n\n" + "Context:\n{context}\n\n" + "Question: {question}" + ) + + final_prompt = rag_prompt_template.format(context=context_str, question=query) + + return {"rag_prompt": final_prompt} + +# --- Demo Execution --- +if __name__ == "__main__": + # 1. Build the KB from Modal docs + kb_result = process_knowledge_base( + kb_name="product-docs-v1", + source_type="URL", + path_or_url="https://modal.com/docs/guide" + ) + print("Knowledge Base Created:", kb_result) + + # 2. Run a RAG query + user_query = "How do I get started with Modal?" + rag_result = process_rag_query(user_query, kb_result["knowledge_base"]) + print("\nAugmented RAG Prompt:\n", rag_result["rag_prompt"]) + +## speech to text +"""{ + "id": "HFSpeechToText-1", + "type": "HFSpeechToText", + "data": { + "display_name": "Transcribe Audio (Whisper)", + "template": { + "model_id": { + "display_name": "Model ID", + "type": "string", + "value": "openai/whisper-large-v3" + }, + "audio_input": { + "display_name": "Audio Input", + "type": "object", + "is_handle": true + }, + "transcribed_text": { + "display_name": "Transcribed Text", + "type": "string", + "is_handle": true + } + } + }, + "resources": { + "cpu": 1.0, + "memory": "4Gi", + "gpu": "T4" + } +}""" + +import torch +from transformers import pipeline +from typing import Any, Dict + +# --- In a real Modal app, this would be structured like this: --- +# +# import modal +# image = modal.Image.debian_slim().pip_install("transformers", "torch", "librosa") +# stub = modal.Stub("speech-to-text-model") +# +# @stub.cls(gpu="T4", image=image) +# class WhisperModel: +# def __init__(self): +# device = "cuda" if torch.cuda.is_available() else "cpu" +# self.pipe = pipeline( +# "automatic-speech-recognition", +# model="openai/whisper-large-v3", +# torch_dtype=torch.float16, +# device=device, +# ) +# +# @modal.method() +# def run_inference(self, audio_path): +# # The function logic from below would be here. +# ... +# ------------------------------------------------------------------- + + +def process_hf_speech_to_text(model_id: str, audio_input: Dict[str, Any]) -> Dict[str, Any]: + """ + Transcribes an audio file using a Hugging Face ASR pipeline. + + NOTE: This function simulates the inference part of a stateful Modal class. + The model pipeline should be loaded only once. + """ + if audio_input.get("type") != "audio": + raise ValueError("Input must be of type 'audio'.") + + audio_path = audio_input["value"] + + # --- This part would be inside the Modal class method --- + + # In a real implementation, 'pipe' would be a class attribute (self.pipe) + # loaded in the __init__ or @enter method. + device = "cuda" if torch.cuda.is_available() else "cpu" + pipe = pipeline( + "automatic-speech-recognition", + model=model_id, + torch_dtype=torch.float16, + device=device, + ) + + outputs = pipe( + audio_path, + chunk_length_s=30, + batch_size=24, + return_timestamps=True, + ) + + return {"transcribed_text": outputs["text"]} + +## text to speech +"""{ + "id": "HFTextToSpeech-1", + "type": "HFTextToSpeech", + "data": { + "display_name": "Generate Speech", + "template": { + "model_id": { + "display_name": "Model ID", + "type": "string", + "value": "microsoft/speecht5_tts" + }, + "text_input": { + "display_name": "Text Input", + "type": "string", + "is_handle": true + }, + "audio_output": { + "display_name": "Audio Output", + "type": "object", + "is_handle": true + } + } + }, + "resources": { + "cpu": 1.0, + "memory": "4Gi", + "gpu": "T4" + } +}""" + +import torch +from transformers import pipeline +import soundfile as sf +from typing import Any, Dict + +def process_hf_text_to_speech(model_id: str, text_input: str) -> Dict[str, Any]: + """ + Synthesizes speech from text using a Hugging Face TTS pipeline. + + NOTE: Simulates the inference part of a stateful Modal class. + """ + # --- This part would be inside the Modal class method --- + + # The pipeline and embeddings would be loaded once in the class. + pipe = pipeline("text-to-speech", model=model_id, device="cuda") + + # SpeechT5 requires speaker embeddings for voice characteristics + from transformers import SpeechT5HifiGan + vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan").to("cuda") + + # A dummy embedding for a generic voice + import numpy as np + speaker_embedding = np.random.rand(1, 512).astype(np.float32) + + speech = pipe(text_input, forward_params={"speaker_embeddings": speaker_embedding}) + + # Save the output to a file and return the path + output_path = "/tmp/output.wav" + sf.write(output_path, speech["audio"], samplerate=speech["sampling_rate"]) + + return {"audio_output": {"type": "audio", "value": output_path}} + +## text generation +"""{ + "id": "HFTextGeneration-1", + "type": "HFTextGeneration", + "data": { + "display_name": "Generate with Mistral", + "template": { + "model_id": { + "display_name": "Model ID", + "type": "string", + "value": "mistralai/Mistral-7B-Instruct-v0.2" + }, + "max_new_tokens": { + "display_name": "Max New Tokens", + "type": "number", + "value": 256 + }, + "prompt": { + "display_name": "Prompt", + "type": "string", + "is_handle": true + }, + "generated_text": { + "display_name": "Generated Text", + "type": "string", + "is_handle": true + } + } + }, + "resources": { + "cpu": 2.0, + "memory": "24Gi", + "gpu": "A10G" + } +}""" + +import torch +from transformers import pipeline +from typing import Any, Dict + +def process_hf_text_generation(model_id: str, prompt: str, max_new_tokens: int) -> Dict[str, Any]: + """ + Generates text from a prompt using a Hugging Face LLM. + + NOTE: Simulates the inference part of a stateful Modal class. + """ + # --- This part would be inside the Modal class method --- + + # The pipeline is loaded once on container start. + pipe = pipeline( + "text-generation", + model=model_id, + torch_dtype=torch.bfloat16, + device_map="auto", + ) + + messages = [{"role": "user", "content": prompt}] + + # The pipeline needs the prompt to be formatted correctly for instruct models + formatted_prompt = pipe.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) + + outputs = pipe( + formatted_prompt, + max_new_tokens=max_new_tokens, + do_sample=True, + temperature=0.7, + top_k=50, + top_p=0.95, + ) + + # Extract only the generated part of the text + generated_text = outputs[0]["generated_text"] + # The output includes the prompt, so we remove it. + response_text = generated_text[len(formatted_prompt):] + + return {"generated_text": response_text} + +## image generation +"""{ + "id": "HFImageGeneration-1", + "type": "HFImageGeneration", + "data": { + "display_name": "Generate Image (SDXL)", + "template": { + "model_id": { + "display_name": "Base Model ID", + "type": "string", + "value": "stabilityai/stable-diffusion-xl-base-1.0" + }, + "lora_id": { + "display_name": "LoRA Model ID (Optional)", + "type": "string", + "value": "nerijs/pixel-art-xl" + }, + "prompt": { + "display_name": "Prompt", + "type": "string", + "is_handle": true + }, + "image_output": { + "display_name": "Image Output", + "type": "object", + "is_handle": true + } + } + }, + "resources": { + "cpu": 2.0, + "memory": "24Gi", + "gpu": "A10G" + } +}""" + +import torch +from diffusers import StableDiffusionXLPipeline +from typing import Any, Dict + +def process_hf_image_generation(model_id: str, prompt: str, lora_id: str = None) -> Dict[str, Any]: + """ + Generates an image using a Stable Diffusion pipeline, with optional LoRA. + + NOTE: Simulates the inference part of a stateful Modal class. + """ + # --- This part would be inside the Modal class method --- + + # The base pipeline is loaded once. + pipe = StableDiffusionXLPipeline.from_pretrained( + model_id, + torch_dtype=torch.float16, + variant="fp16", + use_safetensors=True + ).to("cuda") + + # If a LoRA is specified, load and fuse it. + # In a real app, this logic would be more complex to handle multiple LoRAs. + if lora_id: + pipe.load_lora_weights(lora_id) + pipe.fuse_lora() + + # Generate the image + image = pipe(prompt=prompt).images[0] + + output_path = "/tmp/generated_image.png" + image.save(output_path) + + return {"image_output": {"type": "image", "value": output_path}} + +## captioning image to text +"""{ + "id": "HFVisionModel-1", + "type": "HFVisionModel", + "data": { + "display_name": "Describe Image", + "template": { + "task": { + "display_name": "Task", + "type": "options", + "options": ["image-to-text"], + "value": "image-to-text" + }, + "model_id": { + "display_name": "Model ID", + "type": "string", + "value": "Salesforce/blip-image-captioning-large" + }, + "image_input": { + "display_name": "Image Input", + "type": "object", + "is_handle": true + }, + "result": { + "display_name": "Result", + "type": "string", + "is_handle": true + } + } + }, + "resources": { + "cpu": 1.0, + "memory": "8Gi", + "gpu": "T4" + } +}""" + +from transformers import pipeline +from PIL import Image +from typing import Any, Dict + +def process_hf_vision_model(task: str, model_id: str, image_input: Dict[str, Any]) -> Dict[str, Any]: + """ + Performs a vision-based task, like image captioning. + + NOTE: Simulates the inference part of a stateful Modal class. + """ + if image_input.get("type") != "image": + raise ValueError("Input must be of type 'image'.") + + image_path = image_input["value"] + + # --- This part would be inside the Modal class method --- + + # The pipeline is loaded once. + pipe = pipeline(task, model=model_id, device="cuda") + + # Open the image file + image = Image.open(image_path) + + result = pipe(image) + + # The output format for this pipeline is a list of dicts + # e.g., [{'generated_text': 'a cat sitting on a couch'}] + output_text = result[0]['generated_text'] + + return {"result": output_text} + +import os +from openai import OpenAI + +client = OpenAI( + base_url="https://api.studio.nebius.com/v1/", + api_key=os.environ.get("NEBIUS_API_KEY") +) + +response = client.images.generate( + model="black-forest-labs/flux-dev", + response_format="b64_json", + extra_body={ + "response_extension": "png", + "width": 1024, + "height": 1024, + "num_inference_steps": 28, + "negative_prompt": "", + "seed": -1 + }, + prompt="pokemon" +) + +print(response.to_json()) + + +## nebius image generation +"""{ + "id": "NebiusImage-1", + "type": "NebiusImage", + "data": { + "display_name": "Nebius Image Generation", + "template": { + "model": { + "display_name": "Model", + "type": "options", + "options": [ + "black-forest-labs/flux-dev", + "black-forest-labs/flux-schnell", + "stability-ai/sdxl" + ], + "value": "black-forest-labs/flux-dev" + }, + "api_key": { + "display_name": "Nebius API Key", + "type": "SecretStr", + "required": true, + "env_var": "NEBIUS_API_KEY" + }, + "prompt": { + "display_name": "Prompt", + "type": "string", + "is_handle": true + }, + "negative_prompt": { + "display_name": "Negative Prompt (Optional)", + "type": "string", + "value": "" + }, + "width": { + "display_name": "Width", + "type": "number", + "value": 1024 + }, + "height": { + "display_name": "Height", + "type": "number", + "value": 1024 + }, + "num_inference_steps": { + "display_name": "Inference Steps", + "type": "number", + "value": 28 + }, + "seed": { + "display_name": "Seed", + "type": "number", + "value": -1 + }, + "image_output": { + "display_name": "Image Output", + "type": "object", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.2, + "memory": "256Mi", + "gpu": "none" + } +}""" + +import os +import base64 +from typing import Any, Dict +from openai import OpenAI + +def process_nebius_image( + model: str, + api_key: str, + prompt: str, + negative_prompt: str = "", + width: int = 1024, + height: int = 1024, + num_inference_steps: int = 28, + seed: int = -1 +) -> Dict[str, Any]: + """ + Generates an image using the Nebius AI Studio API. + """ + if not api_key: + raise ValueError("Nebius API key is missing.") + + client = OpenAI( + base_url="https://api.studio.nebius.com/v1/", + api_key=api_key + ) + + try: + response = client.images.generate( + model=model, + response_format="b64_json", + prompt=prompt, + extra_body={ + "response_extension": "png", + "width": width, + "height": height, + "num_inference_steps": num_inference_steps, + "negative_prompt": negative_prompt, + "seed": seed + } + ) + + # Extract the base64 encoded string + b64_data = response.data[0].b64_json + + # Decode the string and save the image to a file + image_bytes = base64.b64decode(b64_data) + output_path = "/tmp/nebius_image.png" + with open(output_path, "wb") as f: + f.write(image_bytes) + + # Return a data package with the path to the generated image + return {"image_output": {"type": "image", "value": output_path}} + + except Exception as e: + print(f"Error calling Nebius API: {e}") + return {"image_output": {"error": str(e)}} + + ## mcp new +"""{ + "id": "MCPConnection-1", + "type": "MCPConnection", + "data": { + "display_name": "MCP Server Connection", + "template": { + "server_url": { + "display_name": "MCP Server URL", + "type": "string", + "value": "http://localhost:8000/sse", + "info": "URL to MCP server (HTTP/SSE or stdio command)" + }, + "connection_type": { + "display_name": "Connection Type", + "type": "dropdown", + "options": ["http", "stdio"], + "value": "http" + }, + "allowed_tools": { + "display_name": "Allowed Tools (Optional)", + "type": "list", + "info": "Filter specific tools. Leave empty for all tools" + }, + "api_key": { + "display_name": "API Key (Optional)", + "type": "SecretStr", + "env_var": "MCP_API_KEY" + }, + "mcp_tools_output": { + "display_name": "MCP Tools Output", + "type": "list", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.1, + "memory": "128Mi", + "gpu": "none" + } +} +""" + +"""{ + "id": "MCPAgent-1", + "type": "MCPAgent", + "data": { + "display_name": "MCP-Powered AI Agent", + "template": { + "mcp_tools_input": { + "display_name": "MCP Tools Input", + "type": "list", + "is_handle": true + }, + "llm_model": { + "display_name": "LLM Model", + "type": "dropdown", + "options": ["gpt-4", "gpt-3.5-turbo", "gpt-4o", "gpt-4o-mini"], + "value": "gpt-4o-mini" + }, + "system_prompt": { + "display_name": "System Prompt", + "type": "text", + "value": "You are a helpful AI assistant with access to various tools. Use the available tools to help answer user questions accurately.", + "multiline": true + }, + "user_query": { + "display_name": "User Query", + "type": "string", + "is_handle": true + }, + "max_iterations": { + "display_name": "Max Iterations", + "type": "int", + "value": 10 + }, + "agent_response": { + "display_name": "Agent Response", + "type": "string", + "is_handle": true + } + } + }, + "resources": { + "cpu": 0.5, + "memory": "512Mi", + "gpu": "none" + } +} +""" + +import asyncio +import os +from typing import List, Optional, Dict, Any +from llama_index.tools.mcp import BasicMCPClient, McpToolSpec, get_tools_from_mcp_url, aget_tools_from_mcp_url +from llama_index.core.tools import FunctionTool + +class MCPConnectionNode: + """Node to connect to MCP servers and retrieve tools""" + + def __init__(self): + self.client = None + self.tools = [] + + async def execute(self, + server_url: str, + connection_type: str = "http", + allowed_tools: Optional[List[str]] = None, + api_key: Optional[str] = None) -> Dict[str, Any]: + """ + Connect to MCP server and retrieve available tools + """ + try: + # Set API key if provided + if api_key: + os.environ["MCP_API_KEY"] = api_key + + print(f"๐Ÿ”Œ Connecting to MCP server: {server_url}") + + if connection_type == "http": + # Use LlamaIndex's built-in function to get tools[2] + tools = await aget_tools_from_mcp_url( + server_url, + allowed_tools=allowed_tools + ) + else: + # For stdio connections + self.client = BasicMCPClient(server_url) + mcp_tool_spec = McpToolSpec( + client=self.client, + allowed_tools=allowed_tools + ) + tools = await mcp_tool_spec.to_tool_list_async() + + self.tools = tools + + print(f"โœ… Successfully connected! Retrieved {len(tools)} tools:") + for tool in tools: + print(f" - {tool.metadata.name}: {tool.metadata.description}") + + return { + "success": True, + "tools_count": len(tools), + "tool_names": [tool.metadata.name for tool in tools], + "mcp_tools_output": tools + } + + except Exception as e: + print(f"โŒ Connection failed: {str(e)}") + return { + "success": False, + "error": str(e), + "mcp_tools_output": [] + } + +# Example usage +async def mcp_connection_demo(): + node = MCPConnectionNode() + + # Using a public MCP server (you'll need to replace with actual public servers) + result = await node.execute( + server_url="http://localhost:8000/sse", # Replace with public MCP server + connection_type="http", + allowed_tools=None # Get all tools + ) + + return result +from llama_index.core.agent import FunctionCallingAgentWorker, AgentRunner +from llama_index.llms.openai import OpenAI +from llama_index.core.tools import FunctionTool +from typing import List, Dict, Any +import os + +class MCPAgentNode: + """Node to create and run MCP-powered AI agents""" + + def __init__(self): + self.agent = None + self.tools = [] + + async def execute(self, + mcp_tools_input: List[FunctionTool], + user_query: str, + llm_model: str = "gpt-4o-mini", + system_prompt: str = "You are a helpful AI assistant.", + max_iterations: int = 10) -> Dict[str, Any]: + """ + Create and run MCP-powered agent using FunctionCallingAgent + """ + try: + if not mcp_tools_input: + return { + "success": False, + "error": "No MCP tools provided", + "agent_response": "No tools available to process the query." + } + + print(f"๐Ÿค– Creating agent with {len(mcp_tools_input)} tools...") + + # Initialize LLM[1] + llm = OpenAI( + model=llm_model, + api_key=os.getenv("OPENAI_API_KEY"), + temperature=0.1 + ) + + # Create function calling agent (more reliable than ReAct)[2] + agent_worker = FunctionCallingAgentWorker.from_tools( + tools=mcp_tools_input, + llm=llm, + verbose=True, + system_prompt=system_prompt + ) + + self.agent = AgentRunner(agent_worker) + + print(f"๐Ÿ’ญ Processing query: {user_query}") + + # Execute the query + response = self.agent.chat(user_query) + + return { + "success": True, + "agent_response": str(response.response), + "user_query": user_query, + "tools_used": len(mcp_tools_input) + } + + except Exception as e: + print(f"โŒ Agent execution failed: {str(e)}") + return { + "success": False, + "error": str(e), + "agent_response": f"Sorry, I encountered an error while processing your query: {str(e)}" + } + +# Example usage +async def mcp_agent_demo(tools: List[FunctionTool]): + node = MCPAgentNode() + + result = await node.execute( + mcp_tools_input=tools, + user_query="What tools do you have available and what can you help me with?", + llm_model="gpt-4o-mini", + system_prompt="You are a helpful AI assistant. Use your available tools to provide accurate and useful responses." + ) + + return result + + +example + +import asyncio +import os +from typing import List, Dict, Any +from llama_index.core.tools import FunctionTool +from llama_index.core.agent import FunctionCallingAgentWorker, AgentRunner +from llama_index.llms.openai import OpenAI + +class CompleteMCPWorkflowDemo: + """Complete demo of MCP workflow with connection and agent nodes""" + + def __init__(self): + self.connection_node = MCPConnectionNode() + self.agent_node = MCPAgentNode() + + # Set your OpenAI API key + # os.environ["OPENAI_API_KEY"] = "your-openai-api-key-here" + + async def create_mock_mcp_tools(self) -> List[FunctionTool]: + """ + Create mock MCP tools that simulate a real MCP server + Replace this with actual MCP server connection when available + """ + def get_weather(city: str, country: str = "US") -> str: + """Get current weather information for a city""" + weather_data = { + "london": "Cloudy, 15ยฐC, humidity 80%", + "paris": "Sunny, 22ยฐC, humidity 45%", + "tokyo": "Rainy, 18ยฐC, humidity 90%", + "new york": "Partly cloudy, 20ยฐC, humidity 55%" + } + result = weather_data.get(city.lower(), f"Weather data not available for {city}") + return f"Weather in {city}, {country}: {result}" + + def search_news(topic: str, limit: int = 5) -> str: + """Search for latest news on a given topic""" + news_items = [ + f"Breaking: New developments in {topic}", + f"Analysis: {topic} trends for 2025", + f"Expert opinion on {topic} industry changes", + f"Research shows {topic} impact on society", + f"Global {topic} market outlook" + ] + return f"Top {limit} news articles about {topic}:\n" + "\n".join(news_items[:limit]) + + def calculate_math(expression: str) -> str: + """Calculate mathematical expressions safely""" + try: + # Simple and safe evaluation + allowed_chars = "0123456789+-*/().,_ " + if all(c in allowed_chars for c in expression): + result = eval(expression) + return f"Result: {expression} = {result}" + else: + return f"Invalid expression: {expression}" + except Exception as e: + return f"Error calculating {expression}: {str(e)}" + + def get_company_info(company: str) -> str: + """Get basic company information""" + companies = { + "openai": "OpenAI - AI research company, creator of GPT models", + "microsoft": "Microsoft - Technology corporation, cloud computing and software", + "google": "Google - Search engine and technology company", + "amazon": "Amazon - E-commerce and cloud computing platform" + } + return companies.get(company.lower(), f"Company information not found for {company}") + + # Convert to FunctionTool objects[2] + tools = [ + FunctionTool.from_defaults(fn=get_weather), + FunctionTool.from_defaults(fn=search_news), + FunctionTool.from_defaults(fn=calculate_math), + FunctionTool.from_defaults(fn=get_company_info) + ] + + return tools + + async def run_complete_workflow(self): + """ + Run the complete MCP workflow demonstration + """ + print("๐Ÿš€ Starting Complete MCP Workflow Demo") + print("=" * 60) + + # Step 1: Setup MCP Connection (simulated) + print("\n๐Ÿ“ก Step 1: Setting up MCP Connection...") + + # In real implementation, this would connect to actual MCP server + mock_tools = await self.create_mock_mcp_tools() + + connection_result = { + "success": True, + "tools_count": len(mock_tools), + "tool_names": [tool.metadata.name for tool in mock_tools], + "mcp_tools_output": mock_tools + } + + if connection_result["success"]: + print(f"โœ… MCP Connection successful!") + print(f"๐Ÿ“‹ Retrieved {connection_result['tools_count']} tools:") + for tool_name in connection_result['tool_names']: + print(f" - {tool_name}") + else: + print(f"โŒ MCP Connection failed: {connection_result.get('error')}") + return + + # Step 2: Create and test MCP Agent + print(f"\n๐Ÿค– Step 2: Creating MCP-Powered Agent...") + + test_queries = [ + "What's the weather like in London?", + "Search for news about artificial intelligence", + "Calculate 15 * 8 + 32", + "Tell me about OpenAI company", + "What tools do you have and what can you help me with?" + ] + + for i, query in enumerate(test_queries, 1): + print(f"\n๐Ÿ’ฌ Query {i}: {query}") + print("-" * 40) + + agent_result = await self.agent_node.execute( + mcp_tools_input=connection_result["mcp_tools_output"], + user_query=query, + llm_model="gpt-4o-mini", + system_prompt="""You are a helpful AI assistant with access to weather, news, calculation, and company information tools. + +When a user asks a question: +1. Determine which tool(s) can help answer their question +2. Use the appropriate tool(s) to gather information +3. Provide a clear, helpful response based on the tool results + +Always be informative and explain what tools you used.""", + max_iterations=5 + ) + + if agent_result["success"]: + print(f"๐ŸŽฏ Agent Response:") + print(f"{agent_result['agent_response']}") + else: + print(f"โŒ Agent Error: {agent_result['error']}") + + print("\n" + "="*50) + +# Function to connect to real MCP servers when available +async def connect_to_real_mcp_server(server_url: str): + """ + Example of connecting to a real MCP server + Replace server_url with actual public MCP servers + """ + try: + from llama_index.tools.mcp import aget_tools_from_mcp_url + + print(f"๐Ÿ”Œ Attempting to connect to: {server_url}") + tools = await aget_tools_from_mcp_url(server_url) + + print(f"โœ… Connected successfully! Found {len(tools)} tools:") + for tool in tools: + print(f" - {tool.metadata.name}: {tool.metadata.description}") + + return tools + + except Exception as e: + print(f"โŒ Failed to connect to {server_url}: {e}") + return [] + +# Main execution +async def main(): + """Run the complete demo""" + + # Option 1: Run with mock tools (works immediately) + print("๐ŸŽฎ Running MCP Workflow Demo with Mock Tools") + demo = CompleteMCPWorkflowDemo() + await demo.run_complete_workflow() + + # Option 2: Try connecting to real MCP servers (uncomment when available) + # real_servers = [ + # "http://your-mcp-server.com:8000/sse", + # "https://api.example.com/mcp" + # ] + # + # for server_url in real_servers: + # tools = await connect_to_real_mcp_server(server_url) + # if tools: + # # Use real tools with agent + # agent_node = MCPAgentNode() + # result = await agent_node.execute( + # mcp_tools_input=tools, + # user_query="What can you help me with?", + # llm_model="gpt-4o-mini" + # ) + # print(f"Real MCP Agent Response: {result}") + +if __name__ == "__main__": + asyncio.run(main()) +''' + # all nodes plus python code + +external_modal_config = "" + +gradio_design_json = "" +base_prompt = """ +You are an expert at building and deploying gradio apps, the current project is a workflow builder which builds ai workflows which have to be deployed to Modal. + +You will think and use your powerful reasoning to develop the app. You will be given what all the nodes look like plus their paramters/properties along with suitable python code for each node. + +You will also be given all possible modal deployment configurations (most of the times a simple deployment configuration works) and also a modal gradio app demo. + +You will strictly need to follow the format and also ensure that there are no errors. + +You will only return the python code and nothing else. + +Sometimes, the user may give a gradio design image along with a json for the design of the gradio app, incase this is given you will follow that during app creation, if this is not there ignore this sentence. + +### Core Nodes: +- **Input**: Handles various data types (string, image, video, audio, file) +- **Output**: Displays final results +- **APIRequest**: Makes HTTP requests with headers and body +- **WebSearch**: Performs web searches using DuckDuckGo +- **ExecutePython**: Executes custom Python code +- **ConditionalLogic**: Implements if/else branching logic +- **Wait**: Adds delays to workflows +- **ChatModel**: Integrates OpenAI/Anthropic LLMs + +### AI/ML Nodes: +- **KnowledgeBase**: Creates RAG knowledge bases from URLs/directories +- **RAGQuery**: Performs retrieval-augmented generation +- **HFSpeechToText**: Transcribes audio using Whisper +- **HFTextToSpeech**: Generates speech from text +- **HFTextGeneration**: Generates text using Hugging Face models +- **HFImageGeneration**: Creates images using Stable Diffusion +- **HFVisionModel**: Performs image captioning/analysis +- **NebiusImage**: Generates images using Nebius API + +### Advanced Nodes: +- **MCPConnection**: Connects to MCP servers +- **MCPAgent**: Creates AI agents with MCP tools + +## Node Implementation Functions: +Each node has a corresponding `process_[node_type]()` function that takes the node's parameters and returns results. + +## Workflow Structure: +{ +"workflow_id": "example-v1", +"workflow_name": "Example Workflow", +"nodes": [...], +"edges": [ +{ +"source": "Node-1", +"source_handle": "output_field", +"target": "Node-2", +"target_handle": "input_field" +} +] +} + +here are some of the requirements you can refer: +# Core workflow framework +pydantic==2.11.0 # Prevents the schema validation error make sure to add this for all apps +gradio==5.33.0 + +# AI/LLM Dependencies +llama-index # Latest is compatible with Python <3.12 + +# LlamaIndex Extensions +llama-index-llms-openai +llama-index-llms-anthropic +llama-index-embeddings-huggingface +llama-index-readers-web +llama-index-tools-requests +sentence-transformers + +# Web Search & API Tools +duckduckgo-search + +# MCP Integration +fastmcp + +if youre not sure use the version you are sure with. + +you are free to make assumptions but stick to the workflow. (Also you just have to use the workflow node corresponding code as a reference, while youre making the app make sure there are no errors especially data type errors) + +Also if there are any secrets ask the user to use thier own in the gradio app, also use the latest modal code (try to keep it simple and avoid too many params) + +also bro first do the module installations then imports otherwise the app will fail and one more thing, the app should be named after what the workflow is named. + +no errors please! + +Bro this is very important!!, first run the library installations, after that you import it!!!, make sure you dont make a mistake here + +you may now create the gradio app, +here are your inputs: +""" + +# Initialize Anthropic client +try: + client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) + ANTHROPIC_AVAILABLE = True + print("โœ… Anthropic client initialized successfully!") +except Exception as e: + ANTHROPIC_AVAILABLE = False + print(f"โŒ Anthropic client initialization failed: {e}") + +# Get the current directory and potential component paths +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(current_dir) + +# Try multiple possible locations for components +possible_paths = [ + parent_dir, # Parent directory + current_dir, # Current directory + os.path.join(parent_dir, 'workflowbuilder'), + os.path.join(parent_dir, 'gradiodesigner'), + os.path.join(parent_dir, 'iframecomponent') +] + +# Add all possible paths to sys.path +for path in possible_paths: + if os.path.exists(path) and path not in sys.path: + sys.path.insert(0, path) + +# Try to import components with correct package names +COMPONENTS_AVAILABLE = {} + +try: + from gradio_workflowbuilder import WorkflowBuilder + COMPONENTS_AVAILABLE['workflow'] = True + print("โœ… WorkflowBuilder imported successfully!") +except ImportError as e: + COMPONENTS_AVAILABLE['workflow'] = False + print(f"โŒ WorkflowBuilder not available: {e}") + +try: + from gradio_gradiodesigner import GradioDesigner + COMPONENTS_AVAILABLE['designer'] = True + print("โœ… GradioDesigner imported successfully!") +except ImportError as e: + COMPONENTS_AVAILABLE['designer'] = False + print(f"โŒ GradioDesigner not available: {e}") + +try: + from gradio_iframecomponent import IFrame + COMPONENTS_AVAILABLE['iframe'] = True + print("โœ… IFrame imported successfully!") +except ImportError as e: + COMPONENTS_AVAILABLE['iframe'] = False + print(f"โŒ IFrame not available: {e}") + +print(f"\n๐Ÿ“Š Components Status: {COMPONENTS_AVAILABLE}") + +# Global state for app data +app_state = { + "name": "", + "workflow": None, + "design": None, + "config": None +} + +# Backend integration - Your provided functions +def create_prompt(base_prompt, input_workflow, modal_params, modal_demo, all_nodes, external_modal_config, gradio_design_json): + base_prompt = base_prompt + "Here are all of the workflow nodes plus corresponding python codes \n" + all_nodes + base_prompt = base_prompt + "Here are the modal parameters you can use \n " + modal_params + base_prompt = base_prompt + "Here is the modal gradio app demo \n" + modal_demo + + if len(external_modal_config) > 1: + base_prompt = base_prompt + "Use this modal config given by the user: \n" + external_modal_config + if len(gradio_design_json) > 1: + base_prompt = base_prompt + "Use this gradio app design json: \n" + gradio_design_json + + base_prompt = base_prompt + "Here is the input workflow \n" + input_workflow + + return base_prompt + +def query_anthropic(base_prompt, gradio_image_path=None): + if not ANTHROPIC_AVAILABLE: + raise Exception("Anthropic client not available. Please set ANTHROPIC_API_KEY environment variable.") + + content = [{"type": "text", "text": base_prompt}] + + if gradio_image_path and os.path.exists(gradio_image_path): + try: + with open(gradio_image_path, "rb") as f: + image_data = base64.b64encode(f.read()).decode() + + content.append({ + "type": "image", + "source": { + "type": "base64", + "media_type": "image/png", + "data": image_data + } + }) + except (IOError, OSError) as e: + print(f"Warning: Could not read image file: {e}") + pass + + response = client.messages.create( + model="claude-sonnet-4-20250514", + max_tokens=5000, + messages=[{ + "role": "user", + "content": content + }] + ) + + return response.content[0].text + +def generate_gradio_app(base_prompt, gradio_image_path=None): + try: + response = query_anthropic(base_prompt, gradio_image_path) + + # Extract Python code from response + start_idx = response.find("```") + end_idx = response.rfind("```") + + if start_idx == -1 or end_idx == -1: + raise Exception("No Python code found in response") + + gradio_code = response[start_idx + 9:end_idx] + + # Ensure the directory exists and write to temp_app.py + temp_app_path = os.path.join(current_dir, "temp_app.py") + with open(temp_app_path, 'w', encoding='utf-8') as temp_app: + temp_app.write(gradio_code) + + print(f"โœ… Generated app saved to: {temp_app_path}") + return True + + except Exception as e: + print(f"โŒ Error generating Gradio app: {e}") + return False + +def parse_modal_url_specific(deployment_output): + pattern = r'https://jniranja--[a-zA-Z0-9\-]+\.modal\.run' + + match = re.search(pattern, deployment_output) + if match: + return match.group(0) + return None + +def deploy_gradio_app(): + try: + temp_app_path = os.path.join(current_dir, "temp_app.py") + + if not os.path.exists(temp_app_path): + return "โŒ temp_app.py not found. Generate app first." + + print("๐Ÿš€ Starting Modal deployment...") + time.sleep(1) + + # Run modal deploy command + result = subprocess.run( + ["modal", "deploy", temp_app_path], + capture_output=True, + text=True, + cwd=current_dir, + timeout=360 # 5 minute timeout + ) + + print("๐Ÿ“‹ Deployment output:") + print(result.stdout) + + if result.stderr: + print("โš ๏ธ Deployment stderr:") + print(result.stderr) + + # Parse URL from output + app_url = parse_modal_url_specific(result.stdout) + + if result.returncode != 0 or not app_url: + return "โŒ Deployment failed. Check logs above." + + print(f"โœ… Deployment successful: {app_url}") + return app_url + + except subprocess.TimeoutExpired: + return "โŒ Deployment timed out after 5 minutes" + except Exception as e: + print(f"โŒ Deployment error: {e}") + return f"โŒ Deployment error: {str(e)}" + +def main_function(base_prompt, input_workflow, modal_params, modal_demo, all_nodes, external_modal_config="", gradio_design_json="", gradio_image_path=None): + try: + print("๐Ÿ”ง Setting things up!") + + # Create the full prompt + full_prompt = create_prompt( + base_prompt, + input_workflow, + modal_params, + modal_demo, + all_nodes, + external_modal_config, + gradio_design_json + ) + + print("๐Ÿค– Generating Gradio app with AI...") + + # Generate the app + if generate_gradio_app(full_prompt, gradio_image_path): + print("๐Ÿš€ Deploying Gradio app to Modal...") + + app_url = deploy_gradio_app() + + # Retry once if deployment failed + if app_url.startswith("โŒ"): + print("๐Ÿ”„ Deployment failed, trying again...") + + if generate_gradio_app(full_prompt, gradio_image_path): + app_url = deploy_gradio_app() + else: + return "โŒ Failed to regenerate app" + + return app_url + else: + return "โŒ Failed to generate Gradio app" + + except Exception as e: + print(f"โŒ Error in main_function: {e}") + return f"โŒ Error: {str(e)}" + +# Helper functions for the frontend +def analyze_workflow(workflow_data): + """Analyze the workflow and provide insights""" + if not workflow_data or not isinstance(workflow_data, dict): + return "No workflow data available" + + nodes = workflow_data.get("nodes", []) + edges = workflow_data.get("edges", []) + + # Count node types + node_counts = {} + for node in nodes: + node_type = node.get("type", "unknown") + node_counts[node_type] = node_counts.get(node_type, 0) + 1 + + # Category analysis + input_nodes = [n for n in nodes if n.get("type", "").endswith("Input")] + processing_nodes = [n for n in nodes if n.get("type") in ["ChatModel", "OpenAIModel", "RAGQuery", "ExecutePython", "ConditionalLogic", "APIRequest"]] + output_nodes = [n for n in nodes if n.get("type", "").endswith("Output")] + + analysis = f"""## ๐Ÿ” Workflow Analysis + +### Overview +- **Total Nodes:** {len(nodes)} +- **Total Connections:** {len(edges)} +- **Workflow Complexity:** {'Simple' if len(nodes) <= 5 else 'Medium' if len(nodes) <= 10 else 'Complex'} + +### Node Categories +- **Input Nodes:** {len(input_nodes)} ๐Ÿ“ฅ +- **Processing Nodes:** {len(processing_nodes)} โš™๏ธ +- **Output Nodes:** {len(output_nodes)} ๐Ÿ“ค + +### Node Details""" + + for node_type, count in node_counts.items(): + emoji_map = { + "ChatInput": "๐Ÿ’ฌ", "Input": "๐Ÿ“ฅ", "ChatOutput": "๐Ÿ’ญ", "Output": "๐Ÿ“ค", + "ChatModel": "๐Ÿง ", "OpenAIModel": "๐ŸŽฏ", "RAGQuery": "๐Ÿ”Ž", "KnowledgeBase": "๐Ÿ“–", + "ExecutePython": "๐Ÿ", "ConditionalLogic": "๐Ÿ”€", "APIRequest": "๐ŸŒ", "WebSearch": "๐Ÿ”" + } + emoji = emoji_map.get(node_type, "โšก") + analysis += f"\n- **{node_type}:** {count} {emoji}" + + return analysis + +def add_app_name_to_workflow(workflow_data, app_name): + """Add app name to workflow metadata""" + if not workflow_data: + workflow_data = {"nodes": [], "edges": []} + + workflow_data["app_name"] = app_name + workflow_data["created_at"] = datetime.datetime.now().isoformat() + workflow_data["workflow_id"] = f"{app_name.lower().replace(' ', '-')}-{int(datetime.datetime.now().timestamp())}" + + return workflow_data + +def save_workflow_with_name(workflow_data, app_name): + """Save workflow with app name added""" + if app_name: + app_state["name"] = app_name + updated_workflow = add_app_name_to_workflow(workflow_data, app_name) + app_state["workflow"] = updated_workflow + return updated_workflow + return workflow_data + +def validate_workflow(workflow_data): + """Validate if workflow is complete and ready for deployment""" + if not workflow_data: + return False, "No workflow data found" + + nodes = workflow_data.get("nodes", []) + edges = workflow_data.get("edges", []) + + if len(nodes) == 0: + return False, "Workflow must have at least one node" + + # Check for input and output nodes + has_input = any(n.get("type", "").endswith("Input") for n in nodes) + has_output = any(n.get("type", "").endswith("Output") for n in nodes) + + if not has_input: + return False, "Workflow must have at least one Input node" + + if not has_output: + return False, "Workflow must have at least one Output node" + + if len(edges) == 0 and len(nodes) > 1: + return False, "Nodes must be connected with edges" + + return True, "Workflow is valid and ready for deployment" + +def export_workflow(workflow_data): + """Export workflow as formatted JSON""" + if not workflow_data: + return "No workflow to export" + return json.dumps(workflow_data, indent=2) + +def generate_python_code(workflow_data): + """Generate Python code from workflow""" + if not workflow_data: + return "No workflow data" + + app_name = workflow_data.get("app_name", "MyApp") + nodes = workflow_data.get("nodes", []) + + code = f'''import gradio as gr +import json + +def create_{app_name.lower().replace(" ", "_")}_app(): + """Generated {app_name} from FlowForge visual editor""" + + # Initialize components +''' + + # Generate code for each node + for node in nodes: + node_type = node.get("type", "") + data = node.get("data", {}) + node_id = node.get("id", "").replace("-", "_") + display_name = data.get("display_name", "Component") + + if node_type == "ChatInput": + code += f''' {node_id} = gr.Textbox( + label="{display_name}", + placeholder="Enter your message...", + lines=2 + ) +''' + elif node_type == "Input": + data_type = data.get("template", {}).get("data_type", {}).get("value", "string") + if data_type == "image": + code += f''' {node_id} = gr.Image(label="{display_name}") +''' + elif data_type == "file": + code += f''' {node_id} = gr.File(label="{display_name}") +''' + else: + code += f''' {node_id} = gr.Textbox(label="{display_name}") +''' + elif node_type in ["ChatModel", "OpenAIModel"]: + code += f''' # {display_name} - AI Model Processing + def {node_id}_process(input_text): + # TODO: Implement your AI model logic here + # Model: {data.get("template", {}).get("model", {}).get("value", "gpt-4")} + return f"AI Response: {{input_text}}" +''' + elif node_type in ["ChatOutput", "Output"]: + code += f''' {node_id} = gr.Textbox( + label="{display_name}", + interactive=False + ) +''' + + code += f''' + # Create the interface + with gr.Blocks(title="{app_name}") as demo: + gr.Markdown("# {app_name}") + gr.Markdown("*Generated with FlowForge*") + + # Add your component layout here + # TODO: Connect components based on your workflow + + return demo + +if __name__ == "__main__": + app = create_{app_name.lower().replace(" ", "_")}_app() + app.launch() +''' + + return code + +def create_modal_config(cpu, memory, gpu_type, gpu_count, min_containers, max_containers): + """Create Modal deployment configuration""" + config = { + "image": "modal.Image.debian_slim(python_version='3.12').pip_install('torch', 'transformers', 'accelerate', 'gradio')", + "cpu": float(cpu), + "memory": int(memory), + "gpu": f"modal.gpu.{gpu_type}(count={gpu_count})" if gpu_type != "none" else "none", + "min_containers": int(min_containers), + "max_containers": int(max_containers), + "timeout": 3600, + "max_concurrent_inputs": 100 + } + app_state["config"] = config + return json.dumps(config, indent=2) + +def build_app_with_backend(workflow_json, design_json, modal_config, app_name): + """FIXED: Build and deploy the app using the backend integration""" + if not app_name: + return "โŒ Please set an app name first" + + if not ANTHROPIC_AVAILABLE: + return "โŒ Anthropic API not available. Please set ANTHROPIC_API_KEY environment variable." + + # Validate workflow + # is_valid, message = validate_workflow(workflow_json) + # if not is_valid: + # return f"โŒ Workflow validation failed: {message}" + + try: + # Prepare data for backend + input_workflow = json.dumps(workflow_json, indent=2) if workflow_json else "" + print(input_workflow) + external_modal_config = str(modal_config) if modal_config else "" + gradio_design_json = json.dumps(design_json, indent=2) if design_json else "" + + + # Call the backend main function + result = main_function( + base_prompt=base_prompt, + input_workflow=input_workflow, + modal_params=modal_params, + modal_demo=modal_demo, + all_nodes=all_nodes, + external_modal_config=external_modal_config, + gradio_design_json=gradio_design_json, + gradio_image_path=None + ) + + return result + + except Exception as e: + print(f"โŒ Error in build_app_with_backend: {e}") + return f"โŒ Error building app: {str(e)}" + +def load_url_in_iframe(url): + """Load URL in iframe with validation""" + if not url or url.startswith("โŒ"): + return "https://www.gradio.app" + + # Add protocol if missing + if not url.startswith(('http://', 'https://')): + url = 'https://' + url + + return url + +def create_fallback_component(component_name, message): + """Create a fallback component when the real component isn't available""" + return gr.HTML(f""" +
+

โš ๏ธ {component_name} Not Available

+

{message}

+

To enable this component:

+
    +
  1. Navigate to the {component_name.lower().replace(' ', '')} folder
  2. +
  3. Run: pip install -e .
  4. +
  5. Restart this demo
  6. +
+
+ """) + +def toggle_designer_visibility(show_designer): + """Toggle designer section visibility""" + return gr.Column(visible=show_designer) + +def set_app_name(name): + """Set the global app name""" + app_state["name"] = name + return f"โœ… App name set to: {name}" + +# Create the main interface with improved styling +with gr.Blocks( + title="FlowForge - AI App Builder", + theme=gr.themes.Soft(), + css=""" + /* Enhanced styling */ + .main-container { + max-width: 1600px; + margin: 0 auto; + padding: 0 1rem; + } + + /* Beautiful centered header */ + .hero-header { + text-align: center; + padding: 1.5rem 1rem; + background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); + color: white; + border-radius: 12px; + margin: 1rem 0 1.5rem 0; + box-shadow: 0 4px 15px rgba(0,0,0,0.1); + } + + .hero-title { + font-size: 2.2rem; + font-weight: 700; + margin: 0 0 0.5rem 0; + text-shadow: 1px 1px 2px rgba(0,0,0,0.2); + letter-spacing: -0.5px; + } + + .hero-subtitle { + font-size: 1.1rem; + opacity: 0.95; + margin: 0; + font-weight: 400; + } + + /* App name section */ + .app-name-section { + background: #f0f9ff; + border: 2px solid #0ea5e9; + border-radius: 15px; + padding: 2rem; + margin: 2rem 0; + text-align: center; + } + + /* Instructions styling */ + .instructions { + background: #fef3c7; + border-left: 4px solid #f59e0b; + padding: 1rem 1.5rem; + margin: 1rem 0; + border-radius: 0 8px 8px 0; + font-size: 0.9rem; + } + + .instructions h4 { + margin: 0 0 0.5rem 0; + color: #92400e; + font-size: 1rem; + } + + .instructions ul { + margin: 0; + padding-left: 1.5rem; + } + + .instructions li { + margin: 0.3rem 0; + color: #78350f; + } + + /* Status indicators */ + .status-success { + background: #dcfce7; + color: #166534; + padding: 0.5rem 1rem; + border-radius: 25px; + font-weight: 500; + display: inline-block; + margin: 0.5rem 0; + } + + .status-error { + background: #fee2e2; + color: #991b1b; + padding: 0.5rem 1rem; + border-radius: 25px; + font-weight: 500; + display: inline-block; + margin: 0.5rem 0; + } + + .workflow-section { + margin-bottom: 2rem; + padding: 2rem; + background: white; + border-radius: 15px; + box-shadow: 0 4px 15px rgba(0,0,0,0.05); + } + + .feature-card { + background: white; + border-radius: 15px; + padding: 2rem; + margin: 1rem 0; + box-shadow: 0 4px 15px rgba(0,0,0,0.05); + border: 1px solid #e2e8f0; + transition: all 0.3s ease; + } + + .feature-card:hover { + transform: translateY(-2px); + box-shadow: 0 8px 25px rgba(0,0,0,0.1); + } + """ +) as demo: + + # Beautiful centered header + gr.HTML(""" +
+

โœจ FlowForge

+

๐Ÿš€ Build your own AI app in 3 easy steps

+
+ """) + + # App Name Setup Section + with gr.Column(elem_classes=["clean-section"]): + # gr.HTML('

Build your own AI app in 3 easy steps

') + app_name_input = gr.Textbox( + placeholder="Enter your app name", + show_label=False, + container=False, + elem_classes=["compact-input"] + ) + + set_name_btn = gr.Button( + "Continue", + variant="primary", + size="sm", + elem_classes=["compact-btn"] + ) + + app_name_status = gr.HTML("", elem_classes=["status"]) + + set_name_btn.click( + fn=set_app_name, + inputs=[app_name_input], + outputs=[app_name_status] + ) + # Main application tabs + with gr.Tabs(elem_classes=["step-tabs"]) as tabs: + + # Step 1: Workflow Builder + with gr.TabItem("๐ŸŽจ Create Workflow", elem_classes=["workflow-section"]) as tab1: + # Instructions + + if COMPONENTS_AVAILABLE['workflow']: + with gr.Column(elem_classes=["main-container"]): + workflow_builder = WorkflowBuilder( + label="๐ŸŽจ Visual Workflow Designer", + info="Build your AI workflow by connecting nodes" + ) + + with gr.Row(): + with gr.Column(scale=2): + workflow_json = gr.JSON( + label="๐Ÿ“‹ Workflow Configuration", + show_label=True, + visible=False + ) + with gr.Column(scale=1): + save_workflow_btn = gr.Button( + "๐Ÿ’พ Save Workflow", + variant="primary", + size="lg" + ) + + # Analysis section + with gr.Column(elem_classes=["feature-card"]): + workflow_analysis = gr.Markdown("### ๐Ÿ” Workflow Analysis\n*Save your workflow to see analysis*") + workflow_validation = gr.HTML("") + + # Export options + with gr.Accordion("๐Ÿ“ฅ Export Options", open=False): + with gr.Row(): + export_json_btn = gr.Button("๐Ÿ“„ Export JSON", variant="secondary") + export_code_btn = gr.Button("๐Ÿ Generate Python Code", variant="secondary") + + with gr.Column(): + exported_json = gr.Code(label="Exported JSON", language="json", visible=False) + exported_code = gr.Code(label="Generated Python Code", language="python", visible=False) + + gr.HTML(""" +
+

๐Ÿ“‹ How to Create Your Workflow:

+ +
+ """) + + # Event handlers + save_workflow_btn.click( + fn=save_workflow_with_name, + inputs=[workflow_builder, app_name_input], + outputs=[workflow_json] + ).then( + fn=analyze_workflow, + inputs=[workflow_json], + outputs=[workflow_analysis] + ).then( + fn=lambda w: ( + gr.JSON(visible=True), + gr.HTML(f'
โœ… Workflow saved successfully!
') + if w else gr.HTML('
โŒ Failed to save workflow
') + ), + inputs=[workflow_json], + outputs=[workflow_json, workflow_validation] + ) + + export_json_btn.click( + fn=export_workflow, + inputs=[workflow_json], + outputs=[exported_json] + ).then( + fn=lambda x: gr.Code(visible=True), + inputs=[exported_json], + outputs=[exported_json] + ) + + export_code_btn.click( + fn=generate_python_code, + inputs=[workflow_json], + outputs=[exported_code] + ).then( + fn=lambda x: gr.Code(visible=True), + inputs=[exported_code], + outputs=[exported_code] + ) + else: + create_fallback_component("WorkflowBuilder", "Workflow visual editor not available") + + # Step 2: Gradio Designer + with gr.TabItem("๐ŸŽจ Design Interface", elem_classes=["workflow-section"]) as tab2: + # Instructions + gr.HTML(""" +
+

๐ŸŽจ How to Design Your Interface:

+ +
+ """) + + if COMPONENTS_AVAILABLE['designer']: + with gr.Column(): + with gr.Row(): + use_custom_design = gr.Checkbox( + label="โœจ Create Custom Interface Design", + value=False, + info="Design a custom interface for your workflow (optional)" + ) + + with gr.Column(visible=False, elem_classes=["feature-card"]) as designer_section: + gradio_designer = GradioDesigner( + label="๐ŸŽจ Visual App Designer", + value={"components": [], "layout": "blocks"} + ) + + with gr.Row(): + with gr.Column(scale=2): + design_json = gr.JSON(label="๐Ÿ“‹ Design Configuration", show_label=True) + with gr.Column(scale=1): + save_design_btn = gr.Button( + "๐Ÿ’พ Save Design", + variant="primary", + size="lg" + ) + + save_design_btn.click( + fn=lambda x: x, + inputs=[gradio_designer], + outputs=[design_json] + ) + + # FIXED: Proper visibility toggle + use_custom_design.change( + fn=toggle_designer_visibility, + inputs=[use_custom_design], + outputs=[designer_section] + ) + + # Default design fallback + with gr.Column(elem_classes=["feature-card"]): + gr.Markdown(""" + ### ๐ŸŽฏ Default Interface + *If you don't create a custom design, FlowForge will automatically generate a clean interface based on your workflow.* + + **Default Features:** + - Auto-generated forms for your workflow inputs + - Clean, responsive layout + - Built-in error handling and validation + - Mobile-friendly design + """) + else: + create_fallback_component("GradioDesigner", "Interface designer not available") + + # Step 3: Modal Deployment + with gr.TabItem("๐Ÿš€ Deploy to Modal", elem_classes=["workflow-section"]) as tab3: + # Instructions + gr.HTML(""" +
+

๐Ÿš€ How to Deploy Your App:

+ +
+ """) + + # # Add logging component + # with gr.Column(elem_classes=["feature-card"]): + # logs = gr.Textbox( + # label="๐Ÿ“ Build Logs", + # lines=5, + # interactive=False, + # show_copy_button=True, + # elem_classes=["logs-box"] + # ) + + with gr.Column(): + # ONLY wrap the resource configuration in accordion + with gr.Accordion("โš™๏ธ Configure Modal Resources", open=False): + with gr.Column(scale=1, elem_classes=["feature-card"]): + gr.Markdown("#### ๐Ÿ’ป Resource Configuration") + with gr.Group(): + with gr.Row(): + cpu = gr.Slider(0.1, 8.0, value=0.5, step=0.1, label="๐Ÿ”ง CPU Cores") + memory = gr.Slider(128, 32768, value=512, step=128, label="๐Ÿง  Memory (MB)") + + with gr.Row(): + gpu_type = gr.Dropdown( + choices=["none", "T4", "A10G", "A100"], + value="none", + label="โšก GPU Type" + ) + gpu_count = gr.Number(1, label="๐Ÿ”ข GPU Count", minimum=1, maximum=8) + + gr.Markdown("#### ๐Ÿ“ˆ Auto Scaling") + with gr.Row(): + min_containers = gr.Number(0, label="๐Ÿ“‰ Min Containers", minimum=0) + max_containers = gr.Number(10, label="๐Ÿ“ˆ Max Containers", minimum=1) + + # Keep modal config and update button outside accordion + with gr.Row(): + with gr.Column(scale=2): + modal_config = gr.JSON(label="โš™๏ธ Modal Configuration", show_label=True) + with gr.Column(scale=1): + update_config_btn = gr.Button( + "๐Ÿ”„ Update Config", + variant="secondary", + size="lg" + ) + + update_config_btn.click( + fn=create_modal_config, + inputs=[cpu, memory, gpu_type, gpu_count, min_containers, max_containers], + outputs=[modal_config] + ) + + # Deployment section - keep this exactly as it was + with gr.Column(elem_classes=["feature-card"]): + gr.Markdown("### ๐Ÿš€ Deploy Your Application") + + with gr.Row(): + deploy_btn = gr.Button( + "๐Ÿš€ Build & Deploy App with AI", + # "๐Ÿš€ Build your own AI app in 3 easy steps", + variant="primary", + size="lg", + scale=2 + ) + deployment_status = gr.HTML("") + + app_url = gr.Textbox( + label="๐ŸŒ Your Deployed App URL", + interactive=False, + placeholder="Your app will appear here after deployment...", + info="Share this URL with others to access your deployed app" + ) + + # Keep the deploy button click handler as is + deploy_btn.click( + fn=build_app_with_backend, + inputs=[workflow_json, design_json, modal_config, app_name_input], + outputs=[app_url] + ).then( + fn=lambda url: ( + gr.HTML('
โœ… Deployment successful!
') + if not url.startswith("โŒ") + else gr.HTML('
โŒ Deployment failed
') + ), + inputs=[app_url], + outputs=[deployment_status] + ) + + # Live preview + if COMPONENTS_AVAILABLE['iframe']: + with gr.Column(elem_classes=["feature-card"]): + gr.Markdown("### ๐Ÿ“ฑ Live App Preview") + app_preview = IFrame( + label="๐Ÿ” App Preview", + height=600 + ) + app_url.change( + fn=load_url_in_iframe, + inputs=[app_url], + outputs=[app_preview] + ) + else: + create_fallback_component("IFrame", "App preview not available") + +# Add CSS for logs only +gr.HTML(""" + +""") + +if __name__ == "__main__": + demo.launch( + server_name="0.0.0.0", + show_error=True + )