jamtur01's picture
Upload folder using huggingface_hub
9c6594c verified
import base64
import contextlib
import os
import re
import tempfile
import warnings
from collections.abc import AsyncIterator, Sequence
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote
import gradio_client.utils as client_utils
import httpx
from mcp import types
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from PIL import Image
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Mount, Route
from starlette.types import Receive, Scope, Send
from gradio import processing_utils, route_utils, utils
from gradio.blocks import BlockFunction
from gradio.components import State
from gradio.data_classes import FileData
if TYPE_CHECKING:
from gradio.blocks import BlockContext, Blocks
from gradio.components import Component
DEFAULT_TEMP_DIR = os.environ.get("GRADIO_TEMP_DIR") or str(
Path(tempfile.gettempdir()) / "gradio"
)
class GradioMCPServer:
"""
A class for creating an MCP server around a Gradio app.
Args:
blocks: The Blocks app to create the MCP server for.
"""
def __init__(self, blocks: "Blocks"):
self.blocks = blocks
self.api_info = self.blocks.get_api_info()
self.mcp_server = self.create_mcp_server()
self.root_path = ""
tool_prefix = utils.get_space()
if tool_prefix:
tool_prefix = tool_prefix.split("/")[-1] + "_"
self.tool_prefix = re.sub(r"[^a-zA-Z0-9]", "_", tool_prefix)
else:
self.tool_prefix = ""
self.tool_to_endpoint = self.get_tool_to_endpoint()
self.warn_about_state_inputs()
manager = StreamableHTTPSessionManager(
app=self.mcp_server, json_response=False, stateless=True
)
async def handle_streamable_http(
scope: Scope, receive: Receive, send: Send
) -> None:
await manager.handle_request(scope, receive, send)
@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]: # noqa: ARG001
"""Context manager for managing session manager lifecycle."""
async with manager.run():
try:
yield
finally:
pass
self.lifespan = lifespan
self.manager = manager
self.handle_streamable_http = handle_streamable_http
def get_route_path(self, request: Request) -> str:
"""
Gets the route path of the MCP server based on the incoming request.
Can be different depending on whether the request is coming from the MCP SSE transport or the HTTP transport.
"""
url = httpx.URL(str(request.url))
url = url.copy_with(query=None)
url = str(url).rstrip("/")
if url.endswith("/gradio_api/mcp/messages"):
return "/gradio_api/mcp/messages"
else:
return "/gradio_api/mcp/http"
def get_tool_to_endpoint(self) -> dict[str, str]:
"""
Gets all of the tools that are exposed by the Gradio app and also
creates a mapping from the tool names to the endpoint names in the API docs.
"""
tool_to_endpoint = {}
for endpoint_name, endpoint_info in self.api_info["named_endpoints"].items():
if endpoint_info["show_api"]:
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
if block_fn is None or block_fn.fn is None:
continue
fn_name = (
getattr(block_fn.fn, "__name__", None)
or (
hasattr(block_fn.fn, "__class__")
and getattr(block_fn.fn.__class__, "__name__", None)
)
or endpoint_name.lstrip("/")
)
tool_name = self.tool_prefix + fn_name
while tool_name in tool_to_endpoint:
tool_name = tool_name + "_"
tool_to_endpoint[tool_name] = endpoint_name
return tool_to_endpoint
def warn_about_state_inputs(self) -> None:
"""
Warn about tools that have gr.State inputs.
"""
for _, endpoint_name in self.tool_to_endpoint.items():
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
if block_fn and any(isinstance(input, State) for input in block_fn.inputs):
warnings.warn(
"This MCP server includes a tool that has a gr.State input, which will not be "
"updated between tool calls. The original, default value of the State will be "
"used each time."
)
def create_mcp_server(self) -> Server:
"""
Create an MCP server for the given Gradio Blocks app.
Parameters:
blocks: The Blocks app to create the MCP server for.
Returns:
The MCP server.
"""
server = Server(str(self.blocks.title or "Gradio App"))
@server.call_tool()
async def call_tool(
name: str, arguments: dict[str, Any]
) -> list[types.TextContent | types.ImageContent]:
"""
Call a tool on the Gradio app.
Args:
name: The name of the tool to call.
arguments: The arguments to pass to the tool.
"""
context_request = self.mcp_server.request_context.request
if context_request is None:
raise ValueError(
"Could not find the request object in the MCP server context. This is not expected to happen. Please raise an issue: https://github.com/gradio-app/gradio."
)
route_path = self.get_route_path(context_request)
root_url = route_utils.get_root_url(
request=context_request,
route_path=route_path,
root_path=self.root_path,
)
_, filedata_positions = self.get_input_schema(name)
processed_kwargs = self.convert_strings_to_filedata(
arguments, filedata_positions
)
endpoint_name = self.tool_to_endpoint.get(name)
if endpoint_name is None:
raise ValueError(f"Unknown tool for this Gradio app: {name}")
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
assert block_fn is not None # noqa: S101
if endpoint_name in self.api_info["named_endpoints"]:
parameters_info = self.api_info["named_endpoints"][endpoint_name][
"parameters"
]
processed_args = client_utils.construct_args(
parameters_info,
(),
processed_kwargs,
)
else:
processed_args = []
processed_args = self.insert_empty_state(block_fn.inputs, processed_args)
output = await self.blocks.process_api(
block_fn=block_fn,
inputs=processed_args,
request=context_request,
)
processed_args = self.pop_returned_state(block_fn.inputs, processed_args)
return self.postprocess_output_data(output["data"], root_url)
@server.list_tools()
async def list_tools() -> list[types.Tool]:
"""
List all tools on the Gradio app.
"""
tools = []
for tool_name, endpoint_name in self.tool_to_endpoint.items():
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
assert block_fn is not None and block_fn.fn is not None # noqa: S101
description, parameters, returns = utils.get_function_description(
block_fn.fn
)
if returns:
description += (
("" if description.endswith(".") else ".")
+ " Returns: "
+ ", ".join(returns)
)
schema, _ = self.get_input_schema(tool_name, parameters)
tools.append(
types.Tool(
name=tool_name,
description=description,
inputSchema=schema,
)
)
return tools
return server
def launch_mcp_on_sse(self, app: Starlette, subpath: str, root_path: str) -> None:
"""
Launch the MCP server on the SSE transport.
Parameters:
app: The Gradio app to mount the MCP server on.
subpath: The subpath to mount the MCP server on. E.g. "/gradio_api/mcp"
root_path: The root path of the Gradio Blocks app.
"""
messages_path = "/messages/"
sse = SseServerTransport(messages_path)
self.root_path = root_path
async def handle_sse(request):
try:
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await self.mcp_server.run(
streams[0],
streams[1],
self.mcp_server.create_initialization_options(),
)
return Response()
except Exception as e:
print(f"MCP SSE connection error: {str(e)}")
raise
app.mount(
subpath,
Starlette(
routes=[
Route(
"/schema",
endpoint=self.get_complete_schema, # Not required for MCP but used by the Hugging Face MCP server to get the schema for MCP Spaces without needing to establish an SSE connection
),
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
Mount("/http/", app=self.handle_streamable_http),
],
),
)
def get_block_fn_from_endpoint_name(
self, endpoint_name: str
) -> "BlockFunction | None":
"""
Get the BlockFunction for a given endpoint name (e.g. "/predict").
Parameters:
endpoint_name: The name of the endpoint to get the BlockFunction for.
Returns:
The BlockFunction for the given endpoint name, or None if it is not found.
"""
block_fn = next(
(
fn
for fn in self.blocks.fns.values()
if fn.api_name == endpoint_name.lstrip("/")
),
None,
)
return block_fn
@staticmethod
def insert_empty_state(
inputs: Sequence["Component | BlockContext"], data: list
) -> list:
"""
Insert None placeholder values for any State input components, as State inputs
are not included in the endpoint schema.
"""
for i, input_component_type in enumerate(inputs):
if isinstance(input_component_type, State):
data.insert(i, None)
return data
@staticmethod
def pop_returned_state(
inputs: Sequence["Component | BlockContext"], data: list
) -> list:
"""
Remove any values corresponding to State output components from the data
as State outputs are not included in the endpoint schema.
"""
for i, input_component_type in enumerate(inputs):
if isinstance(input_component_type, State):
data.pop(i)
return data
def get_input_schema(
self,
tool_name: str,
parameters: dict[str, str] | None = None,
) -> tuple[dict[str, Any], list[list[str | int]]]:
"""
Get the input schema of the Gradio app API, appropriately formatted for MCP.
Parameters:
tool_name: The name of the tool to get the schema for, e.g. "predict"
parameters: The description and parameters of the tool to get the schema for.
Returns:
- The input schema of the Gradio app API.
- A list of positions of FileData objects in the input schema.
"""
endpoint_name = self.tool_to_endpoint.get(tool_name)
if endpoint_name is None:
raise ValueError(f"Unknown tool for this Gradio app: {tool_name}")
named_endpoints = self.api_info["named_endpoints"]
endpoint_info = named_endpoints.get(endpoint_name)
assert endpoint_info is not None # noqa: S101
schema = {
"type": "object",
"properties": {
p["parameter_name"]: {
**p["type"],
**(
{"description": parameters[p["parameter_name"]]}
if parameters and p["parameter_name"] in parameters
else {}
),
**(
{"default": p["parameter_default"]}
if "parameter_default" in p and p["parameter_default"]
else {}
),
}
for p in endpoint_info["parameters"]
},
}
return self.simplify_filedata_schema(schema)
async def get_complete_schema(self, request) -> JSONResponse: # noqa: ARG002
"""
Get the complete schema of the Gradio app API. For debugging purposes, also used by
the Hugging Face MCP server to get the schema for MCP Spaces without needing to
establish an SSE connection.
Parameters:
request: The Starlette request object.
Returns:
A JSONResponse containing a dictionary mapping tool names to their input schemas.
"""
if not self.api_info:
return JSONResponse({})
schemas = []
for tool_name, endpoint_name in self.tool_to_endpoint.items():
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
assert block_fn is not None and block_fn.fn is not None # noqa: S101
description, parameters, returns = utils.get_function_description(
block_fn.fn
)
if returns:
description += (
("" if description.endswith(".") else ".")
+ " Returns: "
+ ", ".join(returns)
)
schema, _ = self.get_input_schema(tool_name, parameters)
info = {
"name": tool_name,
"description": description,
"inputSchema": schema,
}
schemas.append(info)
return JSONResponse(schemas)
def simplify_filedata_schema(
self, schema: dict[str, Any]
) -> tuple[dict[str, Any], list[list[str | int]]]:
"""
Parses a schema of a Gradio app API to identify positions of FileData objects. Replaces them with base64
strings while keeping track of their positions so that they can be converted back to FileData objects
later.
Parameters:
schema: The original schema of the Gradio app API.
Returns:
A tuple containing the simplified schema and the positions of the FileData objects.
"""
def is_gradio_filedata(obj: Any, defs: dict[str, Any]) -> bool:
if not isinstance(obj, dict):
return False
if "$ref" in obj:
ref = obj["$ref"]
if ref.startswith("#/$defs/"):
key = ref.split("/")[-1]
obj = defs.get(key, {})
else:
return False
props = obj.get("properties", {})
meta = props.get("meta", {})
if "$ref" in meta:
ref = meta["$ref"]
if ref.startswith("#/$defs/"):
key = ref.split("/")[-1]
meta = defs.get(key, {})
else:
return False
type_field = meta.get("properties", {}).get("_type", {})
default_type = meta.get("default", {}).get("_type")
return (
type_field.get("const") == "gradio.FileData"
or default_type == "gradio.FileData"
)
def traverse(
node: Any,
path: list[str | int] | None = None,
defs: dict[str, Any] | None = None,
) -> Any:
if path is None:
path = []
if defs is None:
defs = {}
if isinstance(node, dict):
if "$defs" in node:
defs.update(node["$defs"])
if is_gradio_filedata(node, defs):
filedata_positions.append(path.copy())
for key in ["properties", "additional_description", "$defs"]:
node.pop(key, None)
node["type"] = "string"
node["format"] = "a http or https url to a file"
result = {}
is_schema_root = "type" in node and "properties" in node
for key, value in node.items():
if is_schema_root and key == "properties":
result[key] = traverse(value, path, defs)
else:
path.append(key)
result[key] = traverse(value, path, defs)
path.pop()
return result
elif isinstance(node, list):
result = []
for i, item in enumerate(node):
path.append(i)
result.append(traverse(item, path, defs))
path.pop()
return result
return node
filedata_positions: list[list[str | int]] = []
simplified_schema = traverse(schema)
return simplified_schema, filedata_positions
def convert_strings_to_filedata(
self, value: Any, filedata_positions: list[list[str | int]]
) -> Any:
"""
Convert specific string values back to FileData objects based on their positions.
This is used to convert string values (as base64 encoded strings) to FileData
dictionaries so that they can be passed into .preprocess() logic of a Gradio app.
Parameters:
value: The input data to process, which can be an arbitrary nested data structure
that may or may not contain strings that should be converted to FileData objects.
filedata_positions: List of paths to positions in the input data that should be converted to FileData objects.
Returns:
The processed data with strings converted to FileData objects where appropriate. Base64
encoded strings are first saved to a temporary file and then converted to a FileData object.
Example:
>>> convert_strings_to_filedata(
{"image": "data:image/jpeg;base64,..."},
[["image"]]
)
>>> {'image': FileData(path='<temporary file path>')},
"""
def traverse(node: Any, path: list[str | int] | None = None) -> Any:
if path is None:
path = []
if isinstance(node, dict):
return {
key: traverse(value, path + [key]) for key, value in node.items()
}
elif isinstance(node, list):
return [traverse(item, path + [i]) for i, item in enumerate(node)]
elif isinstance(node, str) and path in filedata_positions:
if node.startswith("data:"):
# Even though base64 is not officially part of our schema, some MCP clients
# might return base64 encoded strings, so try to save it to a temporary file.
return FileData(
path=processing_utils.save_base64_to_cache(
node, DEFAULT_TEMP_DIR
)
)
elif node.startswith(("http://", "https://")):
return FileData(path=node)
else:
raise ValueError(
f"Invalid file data format, provide a url ('http://...' or 'https://...'). Received: {node}"
)
return node
return traverse(value)
@staticmethod
def get_image(file_path: str) -> Image.Image | None:
"""
If a filepath is a valid image, returns a PIL Image object. Otherwise returns None.
"""
if not os.path.exists(file_path):
return None
ext = os.path.splitext(file_path.lower())[1]
if ext not in Image.registered_extensions():
return None
try:
return Image.open(file_path)
except Exception:
return None
@staticmethod
def get_svg(file_data: Any) -> bytes | None:
"""
If a file_data is a valid FileDataDict with a url that is a data:image/svg+xml, returns bytes of the svg. Otherwise returns None.
"""
if isinstance(file_data, dict) and (url := file_data.get("url")):
if isinstance(url, str) and url.startswith("data:image/svg"):
return unquote(url.split(",", 1)[1]).encode()
else:
return None
else:
return None
@staticmethod
def get_base64_data(image: Image.Image, format: str) -> str:
"""
Returns a base64 encoded string of the image.
"""
buffer = BytesIO()
image.save(buffer, format=format)
return base64.b64encode(buffer.getvalue()).decode("utf-8")
def postprocess_output_data(
self, data: Any, root_url: str
) -> list[types.TextContent | types.ImageContent]:
"""
Postprocess the output data from the Gradio app to convert FileData objects back to base64 encoded strings.
Parameters:
data: The output data to postprocess.
"""
return_values = []
data = processing_utils.add_root_url(data, root_url, None)
for output in data:
if svg_bytes := self.get_svg(output):
base64_data = base64.b64encode(svg_bytes).decode("utf-8")
mimetype = "image/svg+xml"
svg_path = processing_utils.save_bytes_to_cache(
svg_bytes, f"{output['orig_name']}", DEFAULT_TEMP_DIR
)
svg_url = f"{root_url}/gradio_api/file={svg_path}"
return_value = [
types.ImageContent(
type="image", data=base64_data, mimeType=mimetype
),
types.TextContent(
type="text",
text=f"SVG Image URL: {svg_url}",
),
]
elif client_utils.is_file_obj_with_meta(output):
if image := self.get_image(output["path"]):
image_format = image.format or "png"
base64_data = self.get_base64_data(image, image_format)
mimetype = f"image/{image_format.lower()}"
return_value = [
types.ImageContent(
type="image", data=base64_data, mimeType=mimetype
),
types.TextContent(
type="text",
text=f"Image URL: {output['url'] or output['path']}",
),
]
else:
return_value = [
types.TextContent(
type="text", text=str(output["url"] or output["path"])
)
]
else:
return_value = [types.TextContent(type="text", text=str(output))]
return_values.extend(return_value)
return return_values