MCP Integration Guide¶
This guide explains how to use OpenMAS with MCP (Model Context Protocol) version 1.7.1. MCP is a protocol that enables standardized communication between language models, tools, and other services.
MCP 1.7.1 Features¶
OpenMAS integrates with MCP 1.7.1, which provides:
- Stable SSE transport implementation
- Robust connection resilience and error handling
- Efficient serialization of complex data types
- Comprehensive support for tool calling with error handling
Prerequisites¶
To use MCP with OpenMAS, you need to install the mcp
package:
Or if you're installing OpenMAS with poetry:
Supported Transports¶
MCP supports two primary transport mechanisms:
- SSE (Server-Sent Events) - Used for HTTP-based communication between agents
- STDIO - Used for direct pipe-based communication, typically for local processes
Creating MCP Communicators¶
SSE Communicator (Server Mode)¶
from openmas.communication.mcp.sse_communicator import McpSseCommunicator
# Create a server-mode communicator
communicator = McpSseCommunicator(
agent_name="tool_provider",
service_urls={}, # Server doesn't need service URLs
server_mode=True,
http_port=8081, # Optional: specify a port (defaults to 8080)
)
SSE Communicator (Client Mode)¶
from openmas.communication.mcp.sse_communicator import McpSseCommunicator
# Create a client-mode communicator
communicator = McpSseCommunicator(
agent_name="tool_user",
service_urls={"service_name": "http://localhost:8081"},
server_mode=False, # Client mode
)
STDIO Communicator¶
from openmas.communication.mcp.stdio_communicator import McpStdioCommunicator
# Create a STDIO communicator
communicator = McpStdioCommunicator(
agent_name="tool_provider",
service_urls={}, # Not used for STDIO
)
Registering and Using Tools¶
Registering a Tool (Server-side)¶
async def process_text_handler(payload: Dict[str, Any]) -> Dict[str, Any]:
"""Process text by converting to uppercase and counting words."""
# Get the input text from the payload
text = payload.get("text", "")
# Process the text
processed_text = text.upper()
word_count = len(text.split())
# Return the result
return {
"processed_text": processed_text,
"word_count": word_count,
"status": "success"
}
# Register the tool with the communicator
await communicator.register_tool(
name="process_text",
description="Process text input by converting to uppercase and counting words",
function=process_text_handler,
)
Calling a Tool (Client-side)¶
# Create a payload for the tool
payload = {
"text": "Hello, this is a test message.",
# Optionally add a content field in MCP 1.7.1 format
"content": [{"type": "text", "text": "Hello, this is a test message."}],
}
# Call the tool
result = await communicator.call_tool(
target_service="tool_provider",
tool_name="process_text",
arguments=payload,
timeout=10.0, # Optional timeout in seconds
)
Error Handling¶
MCP 1.7.1 provides improved error handling. Here's how to handle errors properly:
from openmas.exceptions import CommunicationError
try:
result = await communicator.call_tool(
target_service="tool_provider",
tool_name="process_text",
arguments=payload,
)
# Process successful result
except asyncio.TimeoutError:
# Handle timeout
print("Tool call timed out")
except CommunicationError as e:
# Handle communication error
print(f"Communication error: {e}")
except Exception as e:
# Handle other errors
print(f"Unexpected error: {e}")
Complete Example¶
See the following examples for complete implementations:
- SSE Example:
examples/example_08_mcp/01_mcp_sse_tool_call/
- STDIO Example:
examples/example_08_mcp/02_mcp_stdio_tool_call/
Best Practices¶
- Proper Initialization: Always initialize and start your communicator before using it.
- Error Handling: Add appropriate error handling for all tool calls.
- Resource Cleanup: Ensure proper cleanup of resources with try/finally blocks.
- Timeout Handling: Set appropriate timeouts for tool calls to prevent hanging.
- Graceful Shutdown: Always shut down communicators properly when your agent stops.
Known Limitations¶
- SSE connections can be sensitive to network issues - implement retry logic for production systems.
- The
test_openmas_mcp_sse_integration
test is currently skipped due to SSE endpoint compatibility issues. - The
test_concurrent_sse_connections
test is skipped due to asyncio event loop issues that need to be addressed.
Troubleshooting¶
- 404 Not Found errors: Ensure the server is running and the URL is correct.
- Connection refused: Check that the server is running and the port is correct.
- Event loop is closed errors: These can occur during cleanup. They're typically harmless but indicate a resource wasn't closed properly.
Future Improvements¶
- Enhanced resilience for SSE connections with automatic reconnection.
- Better handling of concurrent connections.
- More comprehensive end-to-end tests.