File size: 24,600 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 |
import base64
import contextlib
import os
import re
import tempfile
import warnings
from collections.abc import AsyncIterator, Sequence
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote
import gradio_client.utils as client_utils
import httpx
from mcp import types
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from PIL import Image
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Mount, Route
from starlette.types import Receive, Scope, Send
from gradio import processing_utils, route_utils, utils
from gradio.blocks import BlockFunction
from gradio.components import State
from gradio.data_classes import FileData
if TYPE_CHECKING:
from gradio.blocks import BlockContext, Blocks
from gradio.components import Component
DEFAULT_TEMP_DIR = os.environ.get("GRADIO_TEMP_DIR") or str(
Path(tempfile.gettempdir()) / "gradio"
)
class GradioMCPServer:
"""
A class for creating an MCP server around a Gradio app.
Args:
blocks: The Blocks app to create the MCP server for.
"""
def __init__(self, blocks: "Blocks"):
self.blocks = blocks
self.api_info = self.blocks.get_api_info()
self.mcp_server = self.create_mcp_server()
self.root_path = ""
tool_prefix = utils.get_space()
if tool_prefix:
tool_prefix = tool_prefix.split("/")[-1] + "_"
self.tool_prefix = re.sub(r"[^a-zA-Z0-9]", "_", tool_prefix)
else:
self.tool_prefix = ""
self.tool_to_endpoint = self.get_tool_to_endpoint()
self.warn_about_state_inputs()
manager = StreamableHTTPSessionManager(
app=self.mcp_server, json_response=False, stateless=True
)
async def handle_streamable_http(
scope: Scope, receive: Receive, send: Send
) -> None:
await manager.handle_request(scope, receive, send)
@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]: # noqa: ARG001
"""Context manager for managing session manager lifecycle."""
async with manager.run():
try:
yield
finally:
pass
self.lifespan = lifespan
self.manager = manager
self.handle_streamable_http = handle_streamable_http
def get_route_path(self, request: Request) -> str:
"""
Gets the route path of the MCP server based on the incoming request.
Can be different depending on whether the request is coming from the MCP SSE transport or the HTTP transport.
"""
url = httpx.URL(str(request.url))
url = url.copy_with(query=None)
url = str(url).rstrip("/")
if url.endswith("/gradio_api/mcp/messages"):
return "/gradio_api/mcp/messages"
else:
return "/gradio_api/mcp/http"
def get_tool_to_endpoint(self) -> dict[str, str]:
"""
Gets all of the tools that are exposed by the Gradio app and also
creates a mapping from the tool names to the endpoint names in the API docs.
"""
tool_to_endpoint = {}
for endpoint_name, endpoint_info in self.api_info["named_endpoints"].items():
if endpoint_info["show_api"]:
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
if block_fn is None or block_fn.fn is None:
continue
fn_name = (
getattr(block_fn.fn, "__name__", None)
or (
hasattr(block_fn.fn, "__class__")
and getattr(block_fn.fn.__class__, "__name__", None)
)
or endpoint_name.lstrip("/")
)
tool_name = self.tool_prefix + fn_name
while tool_name in tool_to_endpoint:
tool_name = tool_name + "_"
tool_to_endpoint[tool_name] = endpoint_name
return tool_to_endpoint
def warn_about_state_inputs(self) -> None:
"""
Warn about tools that have gr.State inputs.
"""
for _, endpoint_name in self.tool_to_endpoint.items():
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
if block_fn and any(isinstance(input, State) for input in block_fn.inputs):
warnings.warn(
"This MCP server includes a tool that has a gr.State input, which will not be "
"updated between tool calls. The original, default value of the State will be "
"used each time."
)
def create_mcp_server(self) -> Server:
"""
Create an MCP server for the given Gradio Blocks app.
Parameters:
blocks: The Blocks app to create the MCP server for.
Returns:
The MCP server.
"""
server = Server(str(self.blocks.title or "Gradio App"))
@server.call_tool()
async def call_tool(
name: str, arguments: dict[str, Any]
) -> list[types.TextContent | types.ImageContent]:
"""
Call a tool on the Gradio app.
Args:
name: The name of the tool to call.
arguments: The arguments to pass to the tool.
"""
context_request = self.mcp_server.request_context.request
if context_request is None:
raise ValueError(
"Could not find the request object in the MCP server context. This is not expected to happen. Please raise an issue: https://github.com/gradio-app/gradio."
)
route_path = self.get_route_path(context_request)
root_url = route_utils.get_root_url(
request=context_request,
route_path=route_path,
root_path=self.root_path,
)
_, filedata_positions = self.get_input_schema(name)
processed_kwargs = self.convert_strings_to_filedata(
arguments, filedata_positions
)
endpoint_name = self.tool_to_endpoint.get(name)
if endpoint_name is None:
raise ValueError(f"Unknown tool for this Gradio app: {name}")
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
assert block_fn is not None # noqa: S101
if endpoint_name in self.api_info["named_endpoints"]:
parameters_info = self.api_info["named_endpoints"][endpoint_name][
"parameters"
]
processed_args = client_utils.construct_args(
parameters_info,
(),
processed_kwargs,
)
else:
processed_args = []
processed_args = self.insert_empty_state(block_fn.inputs, processed_args)
output = await self.blocks.process_api(
block_fn=block_fn,
inputs=processed_args,
request=context_request,
)
processed_args = self.pop_returned_state(block_fn.inputs, processed_args)
return self.postprocess_output_data(output["data"], root_url)
@server.list_tools()
async def list_tools() -> list[types.Tool]:
"""
List all tools on the Gradio app.
"""
tools = []
for tool_name, endpoint_name in self.tool_to_endpoint.items():
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
assert block_fn is not None and block_fn.fn is not None # noqa: S101
description, parameters, returns = utils.get_function_description(
block_fn.fn
)
if returns:
description += (
("" if description.endswith(".") else ".")
+ " Returns: "
+ ", ".join(returns)
)
schema, _ = self.get_input_schema(tool_name, parameters)
tools.append(
types.Tool(
name=tool_name,
description=description,
inputSchema=schema,
)
)
return tools
return server
def launch_mcp_on_sse(self, app: Starlette, subpath: str, root_path: str) -> None:
"""
Launch the MCP server on the SSE transport.
Parameters:
app: The Gradio app to mount the MCP server on.
subpath: The subpath to mount the MCP server on. E.g. "/gradio_api/mcp"
root_path: The root path of the Gradio Blocks app.
"""
messages_path = "/messages/"
sse = SseServerTransport(messages_path)
self.root_path = root_path
async def handle_sse(request):
try:
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await self.mcp_server.run(
streams[0],
streams[1],
self.mcp_server.create_initialization_options(),
)
return Response()
except Exception as e:
print(f"MCP SSE connection error: {str(e)}")
raise
app.mount(
subpath,
Starlette(
routes=[
Route(
"/schema",
endpoint=self.get_complete_schema, # Not required for MCP but used by the Hugging Face MCP server to get the schema for MCP Spaces without needing to establish an SSE connection
),
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
Mount("/http/", app=self.handle_streamable_http),
],
),
)
def get_block_fn_from_endpoint_name(
self, endpoint_name: str
) -> "BlockFunction | None":
"""
Get the BlockFunction for a given endpoint name (e.g. "/predict").
Parameters:
endpoint_name: The name of the endpoint to get the BlockFunction for.
Returns:
The BlockFunction for the given endpoint name, or None if it is not found.
"""
block_fn = next(
(
fn
for fn in self.blocks.fns.values()
if fn.api_name == endpoint_name.lstrip("/")
),
None,
)
return block_fn
@staticmethod
def insert_empty_state(
inputs: Sequence["Component | BlockContext"], data: list
) -> list:
"""
Insert None placeholder values for any State input components, as State inputs
are not included in the endpoint schema.
"""
for i, input_component_type in enumerate(inputs):
if isinstance(input_component_type, State):
data.insert(i, None)
return data
@staticmethod
def pop_returned_state(
inputs: Sequence["Component | BlockContext"], data: list
) -> list:
"""
Remove any values corresponding to State output components from the data
as State outputs are not included in the endpoint schema.
"""
for i, input_component_type in enumerate(inputs):
if isinstance(input_component_type, State):
data.pop(i)
return data
def get_input_schema(
self,
tool_name: str,
parameters: dict[str, str] | None = None,
) -> tuple[dict[str, Any], list[list[str | int]]]:
"""
Get the input schema of the Gradio app API, appropriately formatted for MCP.
Parameters:
tool_name: The name of the tool to get the schema for, e.g. "predict"
parameters: The description and parameters of the tool to get the schema for.
Returns:
- The input schema of the Gradio app API.
- A list of positions of FileData objects in the input schema.
"""
endpoint_name = self.tool_to_endpoint.get(tool_name)
if endpoint_name is None:
raise ValueError(f"Unknown tool for this Gradio app: {tool_name}")
named_endpoints = self.api_info["named_endpoints"]
endpoint_info = named_endpoints.get(endpoint_name)
assert endpoint_info is not None # noqa: S101
schema = {
"type": "object",
"properties": {
p["parameter_name"]: {
**p["type"],
**(
{"description": parameters[p["parameter_name"]]}
if parameters and p["parameter_name"] in parameters
else {}
),
**(
{"default": p["parameter_default"]}
if "parameter_default" in p and p["parameter_default"]
else {}
),
}
for p in endpoint_info["parameters"]
},
}
return self.simplify_filedata_schema(schema)
async def get_complete_schema(self, request) -> JSONResponse: # noqa: ARG002
"""
Get the complete schema of the Gradio app API. For debugging purposes, also used by
the Hugging Face MCP server to get the schema for MCP Spaces without needing to
establish an SSE connection.
Parameters:
request: The Starlette request object.
Returns:
A JSONResponse containing a dictionary mapping tool names to their input schemas.
"""
if not self.api_info:
return JSONResponse({})
schemas = []
for tool_name, endpoint_name in self.tool_to_endpoint.items():
block_fn = self.get_block_fn_from_endpoint_name(endpoint_name)
assert block_fn is not None and block_fn.fn is not None # noqa: S101
description, parameters, returns = utils.get_function_description(
block_fn.fn
)
if returns:
description += (
("" if description.endswith(".") else ".")
+ " Returns: "
+ ", ".join(returns)
)
schema, _ = self.get_input_schema(tool_name, parameters)
info = {
"name": tool_name,
"description": description,
"inputSchema": schema,
}
schemas.append(info)
return JSONResponse(schemas)
def simplify_filedata_schema(
self, schema: dict[str, Any]
) -> tuple[dict[str, Any], list[list[str | int]]]:
"""
Parses a schema of a Gradio app API to identify positions of FileData objects. Replaces them with base64
strings while keeping track of their positions so that they can be converted back to FileData objects
later.
Parameters:
schema: The original schema of the Gradio app API.
Returns:
A tuple containing the simplified schema and the positions of the FileData objects.
"""
def is_gradio_filedata(obj: Any, defs: dict[str, Any]) -> bool:
if not isinstance(obj, dict):
return False
if "$ref" in obj:
ref = obj["$ref"]
if ref.startswith("#/$defs/"):
key = ref.split("/")[-1]
obj = defs.get(key, {})
else:
return False
props = obj.get("properties", {})
meta = props.get("meta", {})
if "$ref" in meta:
ref = meta["$ref"]
if ref.startswith("#/$defs/"):
key = ref.split("/")[-1]
meta = defs.get(key, {})
else:
return False
type_field = meta.get("properties", {}).get("_type", {})
default_type = meta.get("default", {}).get("_type")
return (
type_field.get("const") == "gradio.FileData"
or default_type == "gradio.FileData"
)
def traverse(
node: Any,
path: list[str | int] | None = None,
defs: dict[str, Any] | None = None,
) -> Any:
if path is None:
path = []
if defs is None:
defs = {}
if isinstance(node, dict):
if "$defs" in node:
defs.update(node["$defs"])
if is_gradio_filedata(node, defs):
filedata_positions.append(path.copy())
for key in ["properties", "additional_description", "$defs"]:
node.pop(key, None)
node["type"] = "string"
node["format"] = "a http or https url to a file"
result = {}
is_schema_root = "type" in node and "properties" in node
for key, value in node.items():
if is_schema_root and key == "properties":
result[key] = traverse(value, path, defs)
else:
path.append(key)
result[key] = traverse(value, path, defs)
path.pop()
return result
elif isinstance(node, list):
result = []
for i, item in enumerate(node):
path.append(i)
result.append(traverse(item, path, defs))
path.pop()
return result
return node
filedata_positions: list[list[str | int]] = []
simplified_schema = traverse(schema)
return simplified_schema, filedata_positions
def convert_strings_to_filedata(
self, value: Any, filedata_positions: list[list[str | int]]
) -> Any:
"""
Convert specific string values back to FileData objects based on their positions.
This is used to convert string values (as base64 encoded strings) to FileData
dictionaries so that they can be passed into .preprocess() logic of a Gradio app.
Parameters:
value: The input data to process, which can be an arbitrary nested data structure
that may or may not contain strings that should be converted to FileData objects.
filedata_positions: List of paths to positions in the input data that should be converted to FileData objects.
Returns:
The processed data with strings converted to FileData objects where appropriate. Base64
encoded strings are first saved to a temporary file and then converted to a FileData object.
Example:
>>> convert_strings_to_filedata(
{"image": "data:image/jpeg;base64,..."},
[["image"]]
)
>>> {'image': FileData(path='<temporary file path>')},
"""
def traverse(node: Any, path: list[str | int] | None = None) -> Any:
if path is None:
path = []
if isinstance(node, dict):
return {
key: traverse(value, path + [key]) for key, value in node.items()
}
elif isinstance(node, list):
return [traverse(item, path + [i]) for i, item in enumerate(node)]
elif isinstance(node, str) and path in filedata_positions:
if node.startswith("data:"):
# Even though base64 is not officially part of our schema, some MCP clients
# might return base64 encoded strings, so try to save it to a temporary file.
return FileData(
path=processing_utils.save_base64_to_cache(
node, DEFAULT_TEMP_DIR
)
)
elif node.startswith(("http://", "https://")):
return FileData(path=node)
else:
raise ValueError(
f"Invalid file data format, provide a url ('http://...' or 'https://...'). Received: {node}"
)
return node
return traverse(value)
@staticmethod
def get_image(file_path: str) -> Image.Image | None:
"""
If a filepath is a valid image, returns a PIL Image object. Otherwise returns None.
"""
if not os.path.exists(file_path):
return None
ext = os.path.splitext(file_path.lower())[1]
if ext not in Image.registered_extensions():
return None
try:
return Image.open(file_path)
except Exception:
return None
@staticmethod
def get_svg(file_data: Any) -> bytes | None:
"""
If a file_data is a valid FileDataDict with a url that is a data:image/svg+xml, returns bytes of the svg. Otherwise returns None.
"""
if isinstance(file_data, dict) and (url := file_data.get("url")):
if isinstance(url, str) and url.startswith("data:image/svg"):
return unquote(url.split(",", 1)[1]).encode()
else:
return None
else:
return None
@staticmethod
def get_base64_data(image: Image.Image, format: str) -> str:
"""
Returns a base64 encoded string of the image.
"""
buffer = BytesIO()
image.save(buffer, format=format)
return base64.b64encode(buffer.getvalue()).decode("utf-8")
def postprocess_output_data(
self, data: Any, root_url: str
) -> list[types.TextContent | types.ImageContent]:
"""
Postprocess the output data from the Gradio app to convert FileData objects back to base64 encoded strings.
Parameters:
data: The output data to postprocess.
"""
return_values = []
data = processing_utils.add_root_url(data, root_url, None)
for output in data:
if svg_bytes := self.get_svg(output):
base64_data = base64.b64encode(svg_bytes).decode("utf-8")
mimetype = "image/svg+xml"
svg_path = processing_utils.save_bytes_to_cache(
svg_bytes, f"{output['orig_name']}", DEFAULT_TEMP_DIR
)
svg_url = f"{root_url}/gradio_api/file={svg_path}"
return_value = [
types.ImageContent(
type="image", data=base64_data, mimeType=mimetype
),
types.TextContent(
type="text",
text=f"SVG Image URL: {svg_url}",
),
]
elif client_utils.is_file_obj_with_meta(output):
if image := self.get_image(output["path"]):
image_format = image.format or "png"
base64_data = self.get_base64_data(image, image_format)
mimetype = f"image/{image_format.lower()}"
return_value = [
types.ImageContent(
type="image", data=base64_data, mimeType=mimetype
),
types.TextContent(
type="text",
text=f"Image URL: {output['url'] or output['path']}",
),
]
else:
return_value = [
types.TextContent(
type="text", text=str(output["url"] or output["path"])
)
]
else:
return_value = [types.TextContent(type="text", text=str(output))]
return_values.extend(return_value)
return return_values
|