MCPSynapseChat / mcp_client.py
Omar ID EL MOUMEN
Final version
8227e25
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from websockets import ClientConnection
class MCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.stdio = None
self.write = None
self.ws: Optional[ClientConnection] = None
async def connect(self):
server_params = StdioServerParameters(
command="uv",
args=["--directory", "/app", "run", "server.py"],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
async def list_tools(self):
tools = await self.session.list_tools()
tools_openai = [{
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
} for tool in tools.tools]
return tools_openai