File size: 6,552 Bytes
b15be4b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
import gradio as gr
import sqlite3
import traceback
import os
import re
import uuid
from agno.tools import tool
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
import plotly.express as px
# --- DB Functions ---
def init_product_db():
conn = sqlite3.connect("flipkart_mobiles.db")
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS mobiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
brand TEXT,
color TEXT,
model TEXT,
memory TEXT,
storage TEXT,
rating REAL,
selling_price REAL,
original_price REAL
)
''')
conn.commit()
conn.close()
def read_products():
conn = sqlite3.connect("flipkart_mobiles.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM mobiles")
rows = cursor.fetchall()
conn.close()
return rows
# --- Tool Wrappers ---
DB_PATH = "flipkart_mobiles.db"
TABLE_NAME = "mobiles"
@tool(show_result=True, stop_after_tool_call=True)
def get_columns_info_from_database(columns: str = "*"):
"""
Database Schema: brand, color, model, memory, storage, rating, selling_price, original_price
Table: mobiles
Query the 'mobiles' table selecting specified columns dynamically.
Input:
- columns: a comma-separated string of column names to select, e.g. "brand, model, rating"
If "*", selects all columns.
Returns:
- Formatted string of rows with selected columns.
"""
if columns.strip() != "*":
if not re.fullmatch(r"[a-zA-Z0-9_,\s]+", columns):
return "Invalid columns format."
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Build query string dynamically
query = f"SELECT {columns} FROM {TABLE_NAME}"
try:
cursor.execute(query)
rows = cursor.fetchall()
# Get column names from cursor description
col_names = [desc[0] for desc in cursor.description]
output_lines = []
for row in rows:
row_dict = dict(zip(col_names, row))
formatted_row = ", ".join(f"{col}: {row_dict[col]}" for col in col_names)
output_lines.append(formatted_row)
return "\n".join(output_lines) if output_lines else "No rows found."
except Exception as e:
return f"Query error: {str(e)}"
finally:
conn.close()
@tool(show_result=True, stop_after_tool_call=True)
def generate_python_code(python_code: str) -> str:
"""
You are a Python data scientist. Take the table and columns information from the chat history or agent memory.
Your task is to generate a valid Python script from the following response.
This table and columns information can be in raw English or structured format from the chat history or agent memory like:
- user: task - description
- tabular strings
- JSON-like text
- general descriptive statistics
You must:
1. Convert the data into a pandas DataFrame (use variable name `df`)
2. Select an appropriate chart (bar chart, pie chart, line chart, etc.) based on the user's query
3. Use matplotlib, seaborn, or plotly to plot. Any one of it to create the chart or graph or plot
4. Save the chart using the variable `image_path` to a PNG file
5. Return only the Python code — no comments, no markdown
### Rules:
- Do not use `plt.show()` or any GUI renderer
- Use clear axis labels and title
- Save the figure using `plt.savefig(image_path)`
- `df` must be used for all data manipulations
- You must generate the full Python code block
- execute that Python code and return the path to the saved image folder.
- Create an image into the "plots" folder.
Example code:
```python
import pandas as pd
import matplotlib.pyplot as plt
data = [
{"id": 1, "name": "Alice", "task": "NLP"},
{"id": 2, "name": "Bob", "task": "Vision"},
{"id": 3, "name": "Alice", "task": "NLP"}
]
df = pd.DataFrame(data)
task_counts = df["task"].value_counts()
plt.figure(figsize=(6, 4))
task_counts.plot(kind="bar", color="skyblue")
plt.xlabel("Task")
plt.ylabel("Count")
plt.title("Task Distribution")
plt.savefig(image_path)
```
"""
return python_code
@tool(show_result=True, stop_after_tool_call=True)
def visualization_tool(python_code: str) -> str:
""" This function is for taking the python code as input from chat history or agent memory and cleaning it accordingly so that it can be executed, then executing it and returning the image path.
"""
try:
cleaned_code = re.sub(r"^```(?:python)?|```$", "", python_code.strip(), flags=re.MULTILINE)
image_path = f"plots/{uuid.uuid4().hex}.png"
os.makedirs("plots", exist_ok=True)
exec_context = {
"pd": pd,
"plt": plt,
"sns": sns,
"px": px,
"image_path": image_path
}
exec(cleaned_code, exec_context)
return image_path
except Exception:
return f"Error executing visualization code:\n{traceback.format_exc()}"
# --- Init DB ---
init_product_db()
# --- Define Toolkit ---
toolkit = [
get_columns_info_from_database,
generate_python_code,
visualization_tool
]
# --- Gradio UI ---
tabbed = gr.TabbedInterface(
interface_list=[
gr.Interface(
fn=get_columns_info_from_database.entrypoint,
inputs=[
gr.Textbox(label="Columns (comma separated, or * for all)", value="*")
],
outputs=gr.Textbox(label="Query Result"),
title="Query Products"
),
gr.Interface(
fn=generate_python_code.entrypoint,
inputs=[
gr.Textbox(label="Python code for Visualization", lines=10)
],
outputs=gr.Textbox(label="Python Code for Visualization"),
title="Python Code Generation"
),
gr.Interface(
fn=visualization_tool.entrypoint,
inputs=[
gr.Textbox(label="Visualization", lines=10)
],
outputs=gr.Textbox(label="Saved Image Path"),
title="Auto Visualization"
)
],
tab_names=["Query Products", "Python Code Generation", "Auto Visualization"]
)
# tabbed.launch(mcp_server=True)
tabbed.launch(server_port=7863, mcp_server=True) |