Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,184 @@ server = Server("my-server")
server.experimental.enable_tasks(on_get_task=custom_get_task)
```

### Transport Abstractions Refactored

The session hierarchy has been refactored to support pluggable transport implementations. This introduces several breaking changes:

#### New `AbstractBaseSession` Protocol

A new runtime-checkable Protocol, `AbstractBaseSession`, establishes a transport-agnostic contract for all MCP sessions. It ensures that client and server sessions share a consistent communication interface regardless of the transport used.

##### Key characteristics of `AbstractBaseSession`

To maintain a clean architectural boundary, `AbstractBaseSession` is a pure interface—it defines what methods must exist but does not manage how they work.

* **No State Management**: The protocol does not handle internal machinery like task groups, response streams, or buffers.

* **Implementation Ownership**: The concrete class is fully responsible for managing its own state and lifecycle for how it sends and receives data.

* **No Inheritance Needed**: As a structural protocol, you no longer need to call **super().\_\_init\_\_()** or inherit from a base class to satisfy the contract.

##### Motivation: Interface vs. Implementation

Previously, all custom sessions were required to inherit from BaseSession, which locked them into a specific architecture involving memory streams and JSON-RPC message routing, but with this change, users have the flexibility to implement their own transport logic with either of the following options:

* **BaseSession (Implementation)**: Remains available for transports that follow the standard pattern of reading and writing JSON-RPC messages over streams. You can continue to inherit from this if you want the SDK to handle message linking and routing for you.

* **AbstractBaseSession (Interface)**: A new stateless protocol for transports that do not use standard streams or JSON-RPC. It defines the "what" (method signatures) without enforcing the "how" (internal machinery).

**Before:**

```python
from mcp.shared.session import BaseSession

# Locked into the BaseSession implementation details
class MyCustomSession(BaseSession[...]):
def __init__(self, read_stream, write_stream):
# Mandatory: must use streams and JSON-RPC machinery
super().__init__(read_stream, write_stream)
```

**After:**

```python
# OPTION A: Continue using BaseSession (for stream-based JSON-RPC)
class MyStreamSession(BaseSession):
...

# OPTION B: Use AbstractBaseSession (for non-stream/custom transports)
class MyDirectApiSession: # Satisfies AbstractBaseSession protocol
def __init__(self):
# No streams or super().__init__() required.
# Manage your own custom transport logic here.
self._client = CustomTransportClient()

async def send_request(self, ...):
return await self._client.execute(...)
```

#### Introduction of the `BaseClientSession` Protocol

A new runtime-checkable Protocol, BaseClientSession, has been introduced to establish a common interface for all MCP client sessions. This protocol defines the essential methods—such as `send_request`, `send_notification`, and `initialize`, etc. — that a session must implement to be compatible with the SDK's client utilities.

The primary goal of this protocol is to ensure that the high-level session logic remains consistent irrespective of the underlying transport. Custom session implementations can now satisfy the SDK's requirements simply by implementing the defined methods. No explicit inheritance is required.

```python
from mcp.client.base_client_session import BaseClientSession

class MyCustomTransportSession:
"""
A custom session implementation. It doesn't need to inherit from
BaseClientSession to be compatible.
"""
async def initialize(self) -> InitializeResult:
...

async def send_request(self, ...) -> Any:
...

# Implementing these methods makes this class a 'BaseClientSession'
```

Because the protocol is `@runtime_checkable`, you can verify that any session object adheres to the required structure using standard Python checks:

```python
def start_client(session: BaseClientSession):
# This works for any object implementing the protocol
if not isinstance(session, BaseClientSession):
raise TypeError("Session must implement the BaseClientSession protocol")
```

#### `ClientRequestContext` type changed

`ClientRequestContext` is now `RequestContext[BaseClientSession]` instead of `RequestContext[ClientSession]`. This means callbacks receive the more general `BaseClientSession` type, which may not have all methods available on `ClientSession`.

**Before:**

```python
from mcp.client.context import ClientRequestContext
from mcp.client.session import ClientSession

async def my_callback(context: ClientRequestContext) -> None:
# Could access ClientSession-specific methods
caps = context.session.get_server_capabilities()
```

**After:**

```python
from mcp.client.context import ClientRequestContext
from mcp.client.session import ClientSession

async def my_callback(context: ClientRequestContext) -> None:
# context.session is BaseClientSession - narrow the type if needed
if isinstance(context.session, ClientSession):
caps = context.session.get_server_capabilities()
```

#### Callback protocols are now generic

`sampling_callback`, `elicitation_callback`, and `list_roots_callback` protocols now require explicit type parameters.

**Before:**

```python
from mcp.client.session import SamplingFnT

async def my_sampling(context, params) -> CreateMessageResult:
...

# Type inferred as SamplingFnT
session = ClientSession(..., sampling_callback=my_sampling)
```

**After:**

```python
from mcp.client.session import SamplingFnT, ClientSession

async def my_sampling(
context: RequestContext[ClientSession],
params: CreateMessageRequestParams
) -> CreateMessageResult:
...

# Explicit type annotation recommended
my_sampling_typed: SamplingFnT[ClientSession] = my_sampling
session = ClientSession(..., sampling_callback=my_sampling_typed)
```

#### Internal TypeVars Renamed and Variance Updated

To support structural subtyping and ensure type safety across the new Protocol hierarchy, several internal TypeVariables have been updated with explicit variance (covariant or contravariant). These have been renamed to follow PEP 484 naming conventions.

| Original Name | Updated Name | Variance | Purpose |
| --- | --- | --- | --- |
| SessionT | SessionT_co | Covariant | Allows specialized sessions to be used where a base session is expected. |
| SendRequestT | SendRequestT_contra | Contravariant | Ensures request types can be safely handled by generic handlers. |
| SendNotificationT | SendNotificationT_contra | Contravariant | Ensures notification types are handled safely across the hierarchy. |
| ReceiveResultT | ReceiveResultT_co | Covariant | Handles successful response models safely. |

**Before:**

```python
SessionT_co = TypeVar("SessionT_co", bound="AbstractBaseSession[Any, Any]", covariant=True)
SendRequestT = TypeVar("SendRequestT", ClientRequest, ServerRequest)
SendNotificationT = TypeVar("SendNotificationT", ClientNotification, ServerNotification)
ReceiveResultT = TypeVar("ReceiveResultT", bound=BaseModel)
```

**After:**

```python
SessionT_co = TypeVar("SessionT_co", bound="AbstractBaseSession[Any, Any]", covariant=True)
SendRequestT_contra = TypeVar("SendRequestT_contra", ClientRequest, ServerRequest, contravariant=True)
SendNotificationT_contra = TypeVar(
"SendNotificationT_contra", ClientNotification, ServerNotification, contravariant=True
)
ReceiveResultT_co = TypeVar("ReceiveResultT_co", bound=BaseModel, covariant=True)
```

## Deprecations

<!-- Add deprecations below -->
Expand Down
65 changes: 65 additions & 0 deletions examples/pluggable_transport/client_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Example demonstrating how the high-level `Client` class interacts with
the `BaseClientSession` abstraction for callbacks.
"""

import asyncio

from mcp import Client
from mcp.client.base_client_session import BaseClientSession
from mcp.server.mcpserver import MCPServer
from mcp.shared._context import RequestContext
from mcp.types import (
CreateMessageRequestParams,
CreateMessageResult,
TextContent,
)


async def main():
# 1. Create a simple server with a tool that requires sampling
server = MCPServer("ExampleServer")

@server.tool("ask_assistant")
async def ask_assistant(message: str) -> str:
# The tool asks the client to sample a message (requires the sampling callback)
print(f"[Server] Received request: {message}")
result = await server.get_context().session.create_message(
messages=[{"role": "user", "content": {"type": "text", "text": message}}],
max_tokens=100,
)
return f"Assistant replied: {result.content.text}"

# 2. Define a callback typed against the abstract `BaseClientSession`.
# Notice that we are NOT tied to `ClientSession` streams here!
# Because of the contravariance assigned to `ClientSessionT_contra` in the
# Protocol, this callback is a completely valid mathematical subtype of the
# `SamplingFnT[ClientSession]` expected by `Client` during instantiation.
async def abstract_sampling_callback(
context: RequestContext[BaseClientSession], params: CreateMessageRequestParams
) -> CreateMessageResult:
print("[Client Callback] Server requested sampling via abstract callback!")

# We can safely use `BaseClientSession` abstract methods on `context.session`.
return CreateMessageResult(
role="assistant",
content=TextContent(type="text", text="Hello from the abstract callback!"),
model="gpt-test",
stop_reason="endTurn",
)

# 3. Instantiate the Client, injecting our abstract callback.
# The SDK automatically handles the underlying streams and creates the concrete
# `ClientSession`, which safely fulfills the `BaseClientSession` contract our
# callback expects.
async with Client(server, sampling_callback=abstract_sampling_callback) as client:
print("Executing tool 'ask_assistant' from the Client...")
result = await client.call_tool("ask_assistant", {"message": "Please say hello"})

if not result.is_error:
for content in result.content:
if isinstance(content, TextContent):
print(f"Server Tool Output: {content.text}")


if __name__ == "__main__":
asyncio.run(main())
141 changes: 141 additions & 0 deletions examples/pluggable_transport/custom_transport_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Example demonstrating how to implement a custom transport
that complies with `BaseClientSession` without using read/write streams or JSON-RPC.
"""

import asyncio
from typing import Any

from mcp import types
from mcp.client.base_client_session import BaseClientSession
from mcp.shared.session import ProgressFnT


class CustomDirectSession:
"""A custom MCP session that communicates with a hypothetical internal API
rather than using streaming JSON-RPC.

It satisfies the `BaseClientSession` protocol simply by implementing the required
methods – no inheritance from `BaseSession` or stream initialization required!
"""

async def initialize(self) -> types.InitializeResult:
print("[CustomSession] Initializing custom transport...")
return types.InitializeResult(
protocolVersion="2024-11-05",
capabilities=types.ServerCapabilities(),
serverInfo=types.Implementation(name="CustomDirectServer", version="1.0.0"),
)

async def list_tools(self, *, params: types.PaginatedRequestParams | None = None) -> types.ListToolsResult:
print("[CustomSession] Fetching tools...")
return types.ListToolsResult(
tools=[
types.Tool(
name="direct_tool",
description="A tool executed via direct internal Python call",
inputSchema={"type": "object", "properties": {}},
)
]
)

async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
read_timeout_seconds: float | None = None,
progress_callback: ProgressFnT | None = None,
*,
meta: types.RequestParamsMeta | None = None,
) -> types.CallToolResult:
print(f"[CustomSession] Executing tool '{name}'...")
return types.CallToolResult(
content=[
types.TextContent(
type="text", text=f"Hello from the custom transport! Tool '{name}' executed successfully."
)
]
)

# Note: To fully satisfy the structural protocol of BaseClientSession for static
# type checking (mypy/pyright), all protocol methods must be defined.
# Here we stub the remaining methods for brevity.
async def send_ping(self, *, meta: types.RequestParamsMeta | None = None) -> types.EmptyResult:
return types.EmptyResult()

async def send_request(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def send_notification(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError()

async def send_progress_notification(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError()

async def list_resources(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def list_resource_templates(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def read_resource(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def subscribe_resource(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def unsubscribe_resource(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def list_prompts(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def get_prompt(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def complete(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def set_logging_level(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError()

async def send_roots_list_changed(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError()


# ---------------------------------------------------------------------------
# Using the session with code strictly typed against BaseClientSession
# ---------------------------------------------------------------------------

async def interact_with_mcp(session: BaseClientSession) -> None:
"""This function doesn't know or care if the session is communicating
via stdio streams, SSE, or a custom internal API!
It only depends on the abstract `BaseClientSession` methods.
"""

# 1. Initialize
init_result = await session.initialize()
print(f"Connected to: {init_result.serverInfo.name}@{init_result.serverInfo.version}")

# 2. List Tools
tools_result = await session.list_tools()
for tool in tools_result.tools:
print(f"Found tool: {tool.name} - {tool.description}")

# 3. Call Tool
if tools_result.tools:
call_result = await session.call_tool(tools_result.tools[0].name, arguments={})
for content in call_result.content:
if isinstance(content, types.TextContent):
print(f"Tool Output: {content.text}")


async def main():
# Instantiate our custom non-streaming transport session
custom_session = CustomDirectSession()

# Pass it to the generic runner!
await interact_with_mcp(custom_session)


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading