WebCrawler / src /utils /mcp_client.py
Carlos Gonzalez
Add application file
b1f90a5
import inspect
import logging
import uuid
from datetime import date, datetime, time
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Type, Union, get_type_hints
from browser_use.controller.registry.views import ActionModel
from langchain.tools import BaseTool
from langchain_mcp_adapters.client import MultiServerMCPClient
from pydantic import BaseModel, Field, create_model
from pydantic.v1 import BaseModel, Field
logger = logging.getLogger(__name__)
async def setup_mcp_client_and_tools(mcp_server_config: Dict[str, Any]) -> Optional[MultiServerMCPClient]:
"""
Initializes the MultiServerMCPClient, connects to servers, fetches tools,
filters them, and returns a flat list of usable tools and the client instance.
Returns:
A tuple containing:
- list[BaseTool]: The filtered list of usable LangChain tools.
- MultiServerMCPClient | None: The initialized and started client instance, or None on failure.
"""
logger.info("Initializing MultiServerMCPClient...")
if not mcp_server_config:
logger.error("No MCP server configuration provided.")
return None
try:
if "mcpServers" in mcp_server_config:
mcp_server_config = mcp_server_config["mcpServers"]
client = MultiServerMCPClient(mcp_server_config)
await client.__aenter__()
return client
except Exception as e:
logger.error(f"Failed to setup MCP client or fetch tools: {e}", exc_info=True)
return None
def create_tool_param_model(tool: BaseTool) -> Type[BaseModel]:
"""Creates a Pydantic model from a LangChain tool's schema"""
# Get tool schema information
json_schema = tool.args_schema
tool_name = tool.name
# If the tool already has a schema defined, convert it to a new param_model
if json_schema is not None:
# Create new parameter model
params = {}
# Process properties if they exist
if 'properties' in json_schema:
# Find required fields
required_fields: Set[str] = set(json_schema.get('required', []))
for prop_name, prop_details in json_schema['properties'].items():
field_type = resolve_type(prop_details, f"{tool_name}_{prop_name}")
# Check if parameter is required
is_required = prop_name in required_fields
# Get default value and description
default_value = prop_details.get('default', ... if is_required else None)
description = prop_details.get('description', '')
# Add field constraints
field_kwargs = {'default': default_value}
if description:
field_kwargs['description'] = description
# Add additional constraints if present
if 'minimum' in prop_details:
field_kwargs['ge'] = prop_details['minimum']
if 'maximum' in prop_details:
field_kwargs['le'] = prop_details['maximum']
if 'minLength' in prop_details:
field_kwargs['min_length'] = prop_details['minLength']
if 'maxLength' in prop_details:
field_kwargs['max_length'] = prop_details['maxLength']
if 'pattern' in prop_details:
field_kwargs['pattern'] = prop_details['pattern']
# Add to parameters dictionary
params[prop_name] = (field_type, Field(**field_kwargs))
return create_model(
f'{tool_name}_parameters',
__base__=ActionModel,
**params, # type: ignore
)
# If no schema is defined, extract parameters from the _run method
run_method = tool._run
sig = inspect.signature(run_method)
# Get type hints for better type information
try:
type_hints = get_type_hints(run_method)
except Exception:
type_hints = {}
params = {}
for name, param in sig.parameters.items():
# Skip 'self' parameter and any other parameters you want to exclude
if name == 'self':
continue
# Get annotation from type hints if available, otherwise from signature
annotation = type_hints.get(name, param.annotation)
if annotation == inspect.Parameter.empty:
annotation = Any
# Use default value if available, otherwise make it required
if param.default != param.empty:
params[name] = (annotation, param.default)
else:
params[name] = (annotation, ...)
return create_model(
f'{tool_name}_parameters',
__base__=ActionModel,
**params, # type: ignore
)
def resolve_type(prop_details: Dict[str, Any], prefix: str = "") -> Any:
"""Recursively resolves JSON schema type to Python/Pydantic type"""
# Handle reference types
if '$ref' in prop_details:
# In a real application, reference resolution would be needed
return Any
# Basic type mapping
type_mapping = {
'string': str,
'integer': int,
'number': float,
'boolean': bool,
'array': List,
'object': Dict,
'null': type(None),
}
# Handle formatted strings
if prop_details.get('type') == 'string' and 'format' in prop_details:
format_mapping = {
'date-time': datetime,
'date': date,
'time': time,
'email': str,
'uri': str,
'url': str,
'uuid': uuid.UUID,
'binary': bytes,
}
return format_mapping.get(prop_details['format'], str)
# Handle enum types
if 'enum' in prop_details:
enum_values = prop_details['enum']
# Create dynamic enum class with safe names
enum_dict = {}
for i, v in enumerate(enum_values):
# Ensure enum names are valid Python identifiers
if isinstance(v, str):
key = v.upper().replace(' ', '_').replace('-', '_')
if not key.isidentifier():
key = f"VALUE_{i}"
else:
key = f"VALUE_{i}"
enum_dict[key] = v
# Only create enum if we have values
if enum_dict:
return Enum(f"{prefix}_Enum", enum_dict)
return str # Fallback
# Handle array types
if prop_details.get('type') == 'array' and 'items' in prop_details:
item_type = resolve_type(prop_details['items'], f"{prefix}_item")
return List[item_type] # type: ignore
# Handle object types with properties
if prop_details.get('type') == 'object' and 'properties' in prop_details:
nested_params = {}
for nested_name, nested_details in prop_details['properties'].items():
nested_type = resolve_type(nested_details, f"{prefix}_{nested_name}")
# Get required field info
required_fields = prop_details.get('required', [])
is_required = nested_name in required_fields
default_value = nested_details.get('default', ... if is_required else None)
description = nested_details.get('description', '')
field_kwargs = {'default': default_value}
if description:
field_kwargs['description'] = description
nested_params[nested_name] = (nested_type, Field(**field_kwargs))
# Create nested model
nested_model = create_model(f"{prefix}_Model", **nested_params)
return nested_model
# Handle union types (oneOf, anyOf)
if 'oneOf' in prop_details or 'anyOf' in prop_details:
union_schema = prop_details.get('oneOf') or prop_details.get('anyOf')
union_types = []
for i, t in enumerate(union_schema):
union_types.append(resolve_type(t, f"{prefix}_{i}"))
if union_types:
return Union.__getitem__(tuple(union_types)) # type: ignore
return Any
# Handle allOf (intersection types)
if 'allOf' in prop_details:
nested_params = {}
for i, schema_part in enumerate(prop_details['allOf']):
if 'properties' in schema_part:
for nested_name, nested_details in schema_part['properties'].items():
nested_type = resolve_type(nested_details, f"{prefix}_allOf_{i}_{nested_name}")
# Check if required
required_fields = schema_part.get('required', [])
is_required = nested_name in required_fields
nested_params[nested_name] = (nested_type, ... if is_required else None)
# Create composite model
if nested_params:
composite_model = create_model(f"{prefix}_CompositeModel", **nested_params)
return composite_model
return Dict
# Default to basic types
schema_type = prop_details.get('type', 'string')
if isinstance(schema_type, list):
# Handle multiple types (e.g., ["string", "null"])
non_null_types = [t for t in schema_type if t != 'null']
if non_null_types:
primary_type = type_mapping.get(non_null_types[0], Any)
if 'null' in schema_type:
return Optional[primary_type] # type: ignore
return primary_type
return Any
return type_mapping.get(schema_type, Any)