|
from mcp.server.fastmcp import FastMCP |
|
import time |
|
from litellm import completion |
|
import os |
|
import glob |
|
import http.client |
|
import json |
|
import openpyxl |
|
import shutil |
|
from google import genai |
|
import pexpect |
|
|
|
client = genai.Client(api_key="AIzaSyDtP05TyoIy9j0uPL7_wLEhgQEE75AZQSc") |
|
source_dir = "/app/uploads/temp" |
|
destination_dir = "/app/code_interpreter" |
|
files_list=[] |
|
downloaded_files=[] |
|
|
|
from openai import OpenAI |
|
clienty = OpenAI(api_key="xyz", base_url="https://akiko19191-backend.hf.space/") |
|
|
|
mcp = FastMCP("code_sandbox") |
|
data={} |
|
result="" |
|
import requests |
|
import os |
|
from bs4 import BeautifulSoup |
|
|
|
Parent=pexpect.spawn('bash') |
|
|
|
|
|
|
|
def transfer_files(): |
|
try: |
|
for item in os.listdir(source_dir): |
|
item_path = os.path.join(source_dir, item) |
|
if os.path.isdir(item_path): |
|
for filename in os.listdir(item_path): |
|
source_file_path = os.path.join(item_path, filename) |
|
destination_file_path = os.path.join(destination_dir, filename) |
|
if not os.path.exists(destination_file_path): |
|
shutil.move(source_file_path, destination_file_path) |
|
except: |
|
pass |
|
def transfer_files2(): |
|
try: |
|
for item in os.listdir("/app/uploads"): |
|
if "temp" not in item: |
|
item_path = os.path.join(source_dir, item) |
|
if os.path.isdir(item_path): |
|
for filename in os.listdir(item_path): |
|
source_file_path = os.path.join(item_path, filename) |
|
destination_file_path = os.path.join(destination_dir, filename.split("__")[1]) |
|
if not os.path.exists(destination_file_path): |
|
shutil.move(source_file_path, destination_file_path) |
|
except: |
|
pass |
|
def upload_file(file_path, upload_url): |
|
"""Uploads a file to the specified server endpoint.""" |
|
|
|
try: |
|
|
|
if not os.path.exists(file_path): |
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
|
|
with open(file_path, "rb") as file: |
|
files = {"file": (os.path.basename(file_path), file)} |
|
|
|
|
|
response = requests.post(upload_url, files=files) |
|
|
|
|
|
response.raise_for_status() |
|
|
|
|
|
if response.status_code == 200: |
|
print(f"File uploaded successfully. Filename returned by server: {response.text}") |
|
return response.text |
|
else: |
|
print(f"Upload failed. Status code: {response.status_code}, Response: {response.text}") |
|
return None |
|
|
|
except FileNotFoundError as e: |
|
print(e) |
|
return None |
|
except requests.exceptions.RequestException as e: |
|
print(f"Upload failed. Network error: {e}") |
|
return None |
|
|
|
|
|
TOKEN = "5182224145:AAEjkSlPqV-Q3rH8A9X8HfCDYYEQ44v_qy0" |
|
chat_id = "5075390513" |
|
from requests_futures.sessions import FuturesSession |
|
session = FuturesSession() |
|
|
|
def run(cmd, timeout_sec,forever_cmd): |
|
global Parent |
|
if forever_cmd == 'true': |
|
Parent.close() |
|
Parent = pexpect.spawn("bash") |
|
command="cd /app/code_interpreter/ && "+cmd |
|
|
|
Parent.sendline(command) |
|
Parent.readline().decode() |
|
return str(Parent.readline().decode()) |
|
t=time.time() |
|
child = pexpect.spawn("bash") |
|
output="" |
|
command="cd /app/code_interpreter/ && "+cmd |
|
|
|
child.sendline('PROMPT_COMMAND="echo END"') |
|
child.readline().decode() |
|
child.readline().decode() |
|
|
|
child.sendline(command) |
|
|
|
while (not child.eof() ) and (time.time()-t<300): |
|
x=child.readline().decode() |
|
output=output+x |
|
print(x) |
|
if "END" in x : |
|
output=output.replace("END","") |
|
child.close() |
|
break |
|
if "true" in forever_cmd: |
|
break |
|
return output |
|
|
|
@mcp.tool() |
|
def analyse_audio(audiopath,query) -> dict: |
|
"""Ask another AI model about audios.The AI model can listen to the audio and give answers.Eg-query:Generate detailed minutes of meeting from the audio clip,audiopath='/app/code_interpreter/<audioname>'.Note:The audios are automatically present in the /app/code_interpreter directory.""" |
|
transfer_files2() |
|
myfile = client.files.upload(file=audiopath) |
|
|
|
response = client.models.generate_content( |
|
model='gemini-2.0-flash', |
|
contents=[query, myfile] |
|
) |
|
return {"Output":str(response.text)} |
|
|
|
@mcp.tool() |
|
def analyse_video(videopath,query) -> dict: |
|
"""Ask another AI model about videos.The AI model can see the videos and give answers.Eg-query:Create a very detailed transcript and summary of the video,videopath='/app/code_interpreter/<videoname>'Note:The videos are automatically present in the /app/code_interpreter directory.""" |
|
transfer_files2() |
|
video_file = client.files.upload(file=videopath) |
|
|
|
while video_file.state.name == "PROCESSING": |
|
print('.', end='') |
|
time.sleep(1) |
|
video_file = client.files.get(name=video_file.name) |
|
|
|
if video_file.state.name == "FAILED": |
|
raise ValueError(video_file.state.name) |
|
|
|
response = client.models.generate_content( |
|
model='gemini-2.0-flash', |
|
contents=[query, video_file] |
|
) |
|
return {"Output":str(response.text)} |
|
|
|
|
|
@mcp.tool() |
|
def analyse_images(imagepath,query) -> dict: |
|
"""Ask another AI model about images.The AI model can see the images and give answers.Eg-query:Who is the person in this image?,imagepath='/app/code_interpreter/<imagename>'.Note:The images are automatically present in the /app/code_interpreter directory.""" |
|
transfer_files2() |
|
video_file = client.files.upload(file=imagepath) |
|
|
|
|
|
response = client.models.generate_content( |
|
model='gemini-2.0-flash', |
|
contents=[query, video_file] |
|
) |
|
return {"Output":str(response.text)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@mcp.tool() |
|
def create_code_files(filename: str, code) -> dict: |
|
"""Create code files by passing the the filename as well the entire code to write.The file is created by default in the /app/code_interpreter directory.Note:All user uploaded files that you might need to work upon are stored in the /app/code_interpreter directory.""" |
|
global destination_dir |
|
transfer_files() |
|
transfer_files2() |
|
if not os.path.exists(os.path.join(destination_dir, filename)): |
|
|
|
if isinstance(code, dict): |
|
with open(os.path.join(destination_dir, filename), 'w', encoding='utf-8') as f: |
|
json.dump(code, f, ensure_ascii=False, indent=4) |
|
else: |
|
f = open(os.path.join(destination_dir, filename), "w") |
|
f.write(str(code)) |
|
f.close() |
|
return {"info":"The referenced code files were created successfully."} |
|
|
|
else: |
|
if isinstance(code, dict): |
|
with open(os.path.join(destination_dir, filename), 'w', encoding='utf-8') as f: |
|
json.dump(code, f, ensure_ascii=False, indent=4) |
|
else: |
|
f = open(os.path.join(destination_dir, filename), "w") |
|
f.write(str(code)) |
|
f.close() |
|
return {"info":"The referenced code files were created successfully."} |
|
|
|
|
|
|
|
@mcp.tool() |
|
def run_code(python_packages:str,filename: str, code: str,start_cmd:str,forever_cmd:str) -> dict: |
|
""" |
|
Execute code in a controlled environment with package installation and file handling. |
|
Args: |
|
python_packages[Output an empty string ,if using any other language.]: Space-separated list of packages to install (e.g., "numpy matplotlib"). |
|
Preinstalled packages: gradio, XlsxWriter, openpyxl. |
|
filename: Name of the file to create (stored in /app/code_interpreter/). |
|
code: Full code to write to the file. |
|
start_cmd: Command to execute the file (e.g., "python /app/code_interpreter/app.py" |
|
or "bash /app/code_interpreter/app.py"). |
|
forever_cmd: If 'true', the command will run indefinitely.Set to 'true', when runnig a website/server.Run all servers/website on port 1337. If 'false', the command will time out after 300 second and the result will be returned. |
|
Notes: |
|
- All user-uploaded files are in /app/code_interpreter/. |
|
- After execution, embed a download link (or display images/gifs/videos directly in markdown format) in your response. |
|
""" |
|
global destination_dir |
|
package_names = python_packages.strip() |
|
command="pip install" |
|
if package_names != "" or package_names != " ": |
|
stdot=run( |
|
f"{command} --break-system-packages {package_names}", timeout_sec=300,forever_cmd= 'false' |
|
) |
|
transfer_files2() |
|
transfer_files() |
|
f = open(os.path.join(destination_dir, filename), "w") |
|
f.write(code) |
|
f.close() |
|
global files_list |
|
stdot=run(start_cmd, 300,forever_cmd) |
|
onlyfiles = glob.glob("/app/code_interpreter/*") |
|
onlyfiles=list(set(onlyfiles)-set(files_list)) |
|
uploaded_filenames=[] |
|
for files in onlyfiles: |
|
try: |
|
uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload") |
|
uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}") |
|
except: |
|
pass |
|
files_list=onlyfiles |
|
return {"output":stdot,"Files_download_link":uploaded_filenames} |
|
|
|
|
|
@mcp.tool() |
|
def run_code_files(start_cmd:str,forever_cmd:str) -> dict: |
|
"""Executes a shell command to run code files from /app/code_interpreter. |
|
Runs the given `start_cmd`. The execution behavior depends on `forever_cmd`. |
|
Any server/website started should use port 1337. |
|
Args: |
|
start_cmd (str): The shell command to execute the code. |
|
(e.g., ``python /app/code_interpreter/app.py`` or ``node /app/code_interpreter/server.js``). |
|
Files must be in ``/app/code_interpreter``. |
|
forever_cmd (str): Execution mode. |
|
- ``'true'``: Runs indefinitely (for servers/websites). |
|
- ``'false'``: Runs up to 300s, captures output. |
|
Returns: |
|
dict: A dictionary containing: |
|
- ``'output'`` (str): Captured stdout (mainly when forever_cmd='false'). |
|
- ``'Files_download_link'`` (Any): Links/identifiers for downloadable files. |
|
Notes: |
|
- After execution, embed a download link (or display images/gifs/videos directly in markdown format) in your response. |
|
- When editing and subsequently re-executing the server with the forever_cmd='true' setting, the previous server instance will be automatically terminated, and the updated server will commence operation. This functionality negates the requirement for manual process termination commands such as pkill node. |
|
- The opened ports can be externally accessed at https://suitable-liked-ibex.ngrok-free.app/ (ONLY if the website is running successfully) |
|
""" |
|
global files_list |
|
|
|
stdot=run(start_cmd, 300,forever_cmd) |
|
onlyfiles = glob.glob("/app/code_interpreter/*") |
|
onlyfiles=list(set(onlyfiles)-set(files_list)) |
|
uploaded_filenames=[] |
|
for files in onlyfiles: |
|
try: |
|
uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload") |
|
uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}") |
|
except: |
|
pass |
|
files_list=onlyfiles |
|
return {"output":stdot,"Files_download_link":uploaded_filenames} |
|
|
|
|
|
@mcp.tool() |
|
def run_shell_command(cmd:str,forever_cmd:str) -> dict: |
|
"""Executes a shell command in a sandboxed Alpine Linux environment. |
|
Runs the provided `cmd` string within a bash shell. Commands are executed |
|
relative to the `/app/code_interpreter/` working directory by default. |
|
The execution behavior (indefinite run vs. timeout) is controlled by |
|
the `forever_cmd` parameter. |
|
Important Environment Notes: |
|
- The execution environment is **Alpine Linux**. Commands should be |
|
compatible . |
|
- `sudo` commands are restricted for security reasons.Hence commands which require elevated privelages like `apk add` CANNOT be executed.Instead try to use `pip install` or `npm install` commands. |
|
- Standard bash features like `&&`, `||`, pipes (`|`), etc., are supported. |
|
- When installing python packages , add an argument --break-system-packages to the pip install command. |
|
- The following npm packages are preinstalled: express ejs chart.js .Any additional packages can be installed with npm install command. |
|
Args: |
|
cmd (str): The shell command to execute. |
|
Example: ``mkdir test_dir && ls -l`` |
|
forever_cmd (str): Determines the execution mode. |
|
- ``'true'``: Runs the command indefinitely. Suitable |
|
for starting servers or long-running processes. |
|
Output capture might be limited. |
|
- ``'false'``: Runs the command until completion or |
|
a 300-second timeout, whichever comes first. |
|
Captures standard output. |
|
Returns: |
|
dict: A dictionary containing the execution results: |
|
- ``'output'`` (str): The captured standard output (stdout) and potentially |
|
standard error (stderr) from the command. |
|
""" |
|
transfer_files() |
|
transfer_files2() |
|
output=run(cmd, 300,forever_cmd) |
|
return {"output":output} |
|
|
|
|
|
|
|
@mcp.tool() |
|
def install_python_packages(python_packages:str) -> dict: |
|
"""python_packages to install seperated by space.eg-(python packages:numpy matplotlib).The following python packages are preinstalled:gradio XlsxWriter openpyxl""" |
|
global sbx |
|
package_names = python_packages.strip() |
|
command="pip install" |
|
if not package_names: |
|
return |
|
|
|
stdot=run( |
|
f"{command} --break-system-packages {package_names}", timeout_sec=300, forever_cmd= 'false' |
|
) |
|
|
|
return {"stdout":stdot,"info":"Ran package installation command"} |
|
|
|
@mcp.tool() |
|
def get_youtube_transcript(videoid:str) -> dict: |
|
"""Get the transcript of a youtube video by passing the video id.Eg videoid=ZacjOVVgoLY""" |
|
conn = http.client.HTTPSConnection("youtube-transcript3.p.rapidapi.com") |
|
headers = { |
|
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", |
|
'x-rapidapi-host': "youtube-transcript3.p.rapidapi.com" |
|
} |
|
conn.request("GET",f"/api/transcript?videoId={videoid}", headers=headers) |
|
|
|
res = conn.getresponse() |
|
data = res.read() |
|
return json.loads(data) |
|
|
|
@mcp.tool() |
|
def read_excel_file(filename) -> dict: |
|
"""Reads the contents of an excel file.Returns a dict with key :value pair = cell location:cell content.Always run this command first , when working with excels.The excel file is automatically present in the /app/code_interpreter directory. """ |
|
global destination_dir |
|
transfer_files2() |
|
transfer_files() |
|
|
|
workbook = openpyxl.load_workbook(os.path.join(destination_dir, filename)) |
|
|
|
|
|
excel_data_dict = {} |
|
|
|
|
|
for sheet_name in workbook.sheetnames: |
|
sheet = workbook[sheet_name] |
|
|
|
for row in sheet.iter_rows(): |
|
for cell in row: |
|
|
|
cell_coordinate = cell.coordinate |
|
cell_value = cell.value |
|
if cell_value is not None: |
|
excel_data_dict[cell_coordinate] = str(cell_value) |
|
return excel_data_dict |
|
@mcp.tool() |
|
def scrape_websites(url_list:list,query:str) -> list: |
|
"""Scrapes specific website content.query is the question you want to ask about the content of the website.e.g-query:Give .pptx links in the website,Summarise the content in very great detail,etc.Maximum 4 urls can be passed at a time.""" |
|
|
|
conn = http.client.HTTPSConnection("scrapeninja.p.rapidapi.com") |
|
|
|
|
|
headers = { |
|
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", |
|
'x-rapidapi-host': "scrapeninja.p.rapidapi.com", |
|
'Content-Type': "application/json" |
|
} |
|
Output="" |
|
links="" |
|
content="" |
|
for urls in url_list: |
|
payload = {"url" :urls} |
|
payload=json.dumps(payload) |
|
conn.request("POST", "/scrape", payload, headers) |
|
res = conn.getresponse() |
|
data = res.read() |
|
content=content+str(data.decode("utf-8")) |
|
|
|
|
|
|
|
response = clienty.chat.completions.create( |
|
model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", |
|
messages=[ |
|
{"role": "user", "content": f"{query} [CONTENT]:{content}"} |
|
],stream=True |
|
) |
|
for chunk in response: |
|
Output = Output +str(chunk.choices[0].delta.content) |
|
|
|
response2 = clienty.chat.completions.create( |
|
model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", |
|
messages=[ |
|
{"role": "user", "content": f"Give all relevant and different types of links in this content.The links may be relevant image links , file links , video links , website links , etc .You must give Minimum 30 links and maximum 50 links.[CONTENT]:{content}"} |
|
],stream=True |
|
) |
|
for chunk in response2: |
|
links = links +str(chunk.choices[0].delta.content) |
|
return {"website_content":Output,"relevant_links":links} |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
Ngrok=pexpect.spawn('bash') |
|
Ngrok.sendline("ngrok http --url=suitable-liked-ibex.ngrok-free.app 1337") |
|
Ngrok.readline().decode() |
|
mcp.run(transport='stdio') |