Skip to content

MCP Clients

Connect to MCP servers from applications and AI hosts.

Python Client

Basic Usage

from mcp.client import ClientSession
from mcp.client.stdio import stdio_client
import asyncio

async def main():
    # Connect to server via stdio
    async with stdio_client(
        command="python",
        args=["server.py"]
    ) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize connection
            await session.initialize()

            # List available tools
            tools = await session.list_tools()
            print("Tools:", [t.name for t in tools.tools])

            # Call a tool
            result = await session.call_tool(
                "calculate",
                {"expression": "2 + 2"}
            )
            print("Result:", result.content[0].text)

            # List resources
            resources = await session.list_resources()
            print("Resources:", [r.name for r in resources.resources])

            # Read a resource
            content = await session.read_resource("file:///data/config.json")
            print("Content:", content.contents[0].text)

asyncio.run(main())

SSE Client

from mcp.client.sse import sse_client

async def main():
    async with sse_client("http://localhost:8080/sse") as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            # ... use session

LangChain Integration

MCP Tools in LangChain

from langchain.tools import BaseTool
from mcp.client import ClientSession
from mcp.client.stdio import stdio_client
from pydantic import BaseModel, Field
from typing import Type
import asyncio

class MCPToolWrapper(BaseTool):
    """Wrapper to use MCP tools in LangChain."""

    name: str
    description: str
    session: ClientSession
    args_schema: Type[BaseModel]

    def _run(self, **kwargs) -> str:
        return asyncio.run(self._arun(**kwargs))

    async def _arun(self, **kwargs) -> str:
        result = await self.session.call_tool(self.name, kwargs)
        return result.content[0].text

async def load_mcp_tools(server_command: str, server_args: list) -> list:
    """Load tools from an MCP server for LangChain."""

    async with stdio_client(
        command=server_command,
        args=server_args
    ) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            tools_response = await session.list_tools()
            langchain_tools = []

            for tool in tools_response.tools:
                # Create dynamic Pydantic model from schema
                schema_class = create_schema_from_json(tool.inputSchema)

                langchain_tools.append(MCPToolWrapper(
                    name=tool.name,
                    description=tool.description,
                    session=session,
                    args_schema=schema_class
                ))

            return langchain_tools

MCP Resources as Documents

from langchain_core.documents import Document

async def load_mcp_resources(session: ClientSession) -> list[Document]:
    """Load MCP resources as LangChain documents."""
    resources = await session.list_resources()
    documents = []

    for resource in resources.resources:
        content = await session.read_resource(resource.uri)
        documents.append(Document(
            page_content=content.contents[0].text,
            metadata={
                "uri": resource.uri,
                "name": resource.name,
                "mime_type": resource.mimeType
            }
        ))

    return documents

Custom Client

Connection Manager

from dataclasses import dataclass
from typing import Optional
import asyncio

@dataclass
class MCPConnection:
    command: str
    args: list
    session: Optional[ClientSession] = None
    _read_stream = None
    _write_stream = None

class MCPManager:
    """Manage multiple MCP server connections."""

    def __init__(self):
        self.connections: dict[str, MCPConnection] = {}

    async def connect(self, name: str, command: str, args: list):
        """Connect to an MCP server."""
        if name in self.connections:
            return

        read, write = await stdio_client(command=command, args=args).__aenter__()
        session = ClientSession(read, write)
        await session.__aenter__()
        await session.initialize()

        self.connections[name] = MCPConnection(
            command=command,
            args=args,
            session=session,
            _read_stream=read,
            _write_stream=write
        )

    async def disconnect(self, name: str):
        """Disconnect from an MCP server."""
        if name not in self.connections:
            return

        conn = self.connections[name]
        if conn.session:
            await conn.session.__aexit__(None, None, None)
        del self.connections[name]

    async def call_tool(self, server: str, tool: str, arguments: dict):
        """Call a tool on a specific server."""
        if server not in self.connections:
            raise ValueError(f"Not connected to {server}")

        session = self.connections[server].session
        return await session.call_tool(tool, arguments)

    async def list_all_tools(self) -> dict:
        """List tools from all connected servers."""
        all_tools = {}
        for name, conn in self.connections.items():
            tools = await conn.session.list_tools()
            all_tools[name] = tools.tools
        return all_tools

Usage

async def main():
    manager = MCPManager()

    # Connect to multiple servers
    await manager.connect("files", "python", ["file_server.py"])
    await manager.connect("db", "python", ["db_server.py"])

    # List all tools
    tools = await manager.list_all_tools()
    print(tools)

    # Call tools
    result = await manager.call_tool("files", "read_file", {"path": "/data/test.txt"})
    print(result.content[0].text)

    # Cleanup
    await manager.disconnect("files")
    await manager.disconnect("db")

Streaming Responses

async def stream_tool_call(session: ClientSession, tool: str, args: dict):
    """Stream responses from a tool call."""

    # Note: Streaming depends on server implementation
    result = await session.call_tool(tool, args)

    for content in result.content:
        if content.type == "text":
            print(content.text)
        elif content.type == "image":
            # Handle image content
            pass

Error Handling

from mcp.types import ErrorCode

async def safe_call_tool(session: ClientSession, tool: str, args: dict):
    """Call a tool with error handling."""
    try:
        result = await session.call_tool(tool, args)

        # Check for error content
        for content in result.content:
            if content.type == "text" and content.text.startswith("Error:"):
                return {"success": False, "error": content.text}

        return {"success": True, "result": result}

    except Exception as e:
        return {"success": False, "error": str(e)}

Client Configuration

From JSON Config

import json
from pathlib import Path

async def load_servers_from_config(config_path: str) -> MCPManager:
    """Load MCP servers from configuration file."""
    config = json.loads(Path(config_path).read_text())
    manager = MCPManager()

    for name, server_config in config.get("mcpServers", {}).items():
        command = server_config["command"]
        args = server_config.get("args", [])
        env = server_config.get("env", {})

        await manager.connect(name, command, args, env=env)

    return manager

# config.json format:
# {
#   "mcpServers": {
#     "filesystem": {
#       "command": "uvx",
#       "args": ["mcp-server-filesystem", "/data"]
#     }
#   }
# }

See Also