Communication in OpenMAS¶
This document provides an overview of the communication protocols in OpenMAS, explaining when and how to use each one.
Communication Overview¶
OpenMAS provides a flexible communication layer that abstracts away the complexity of different communication protocols. The core of this system is the BaseCommunicator
abstract base class, which defines the standard interface for sending requests (send_request
), sending notifications (send_notification
), and registering handlers for incoming messages (register_handler
). All specific protocol implementations (like HttpCommunicator
, McpSseCommunicator
, etc.) inherit from BaseCommunicator
.
An agent's communicator is typically instantiated automatically by BaseAgent
based on the communicator_type
and communicator_options
specified in the agent's configuration.
Lazy Loading of Communicators¶
A key design principle in OpenMAS is lazy loading for optional components, especially communicators that require extra dependencies.
- Core vs. Optional: The
HttpCommunicator
(usinghttpx
) might be considered core or have minimal dependencies. Communicators for MCP (mcp
), gRPC (grpcio
), and MQTT (paho-mqtt
) require specific third-party libraries. - Mechanism: OpenMAS does not require you to install all possible communication libraries just to use the core framework. When
BaseAgent
initializes, it looks at the configuredcommunicator_type
. If it's a non-core type (e.g., "grpc", "mcp_sse", "mqtt"), the framework attempts to dynamically import the necessary communicator class (GrpcCommunicator
,McpSseCommunicator
, etc.) usingimportlib
. - Dependency Management: If the import fails because the required underlying library (e.g.,
grpcio
forGrpcCommunicator
) is not installed in your environment, OpenMAS will raise anImportError
or a specificConfigurationError
guiding you to install the necessary optional dependency. - Installation: You install support for optional communicators using pip extras:
(Use
# Install core openmas pip install openmas # Install support for gRPC pip install 'openmas[grpc]' # Install support for MCP (includes mcp-sdk) pip install 'openmas[mcp]' # Install support for MQTT pip install 'openmas[mqtt]' # Install multiple extras pip install 'openmas[grpc,mcp]' # Install all optional communicators and features pip install 'openmas[all]'
poetry add 'openmas[extra]'
if using Poetry).
This approach keeps the core openmas
package lightweight and ensures users only install what they need.
Available Protocols & Communicators¶
OpenMAS provides multiple communication protocols to suit different needs:
HTTP (HttpCommunicator
)¶
- Protocol: Standard HTTP/1.1 (using
httpx
). - Best For: RESTful API interactions, web service integration, general-purpose request/response communication between agents/services across processes or machines.
- Dependencies:
httpx
(likely a core dependency). - Configuration:
communicator_type: http
. Options likehttp_port
(server mode),timeout
,retries
can be set incommunicator_options
.
Model Context Protocol (MCP)¶
MCP is designed for interacting with AI models and tools, particularly from Anthropic. OpenMAS provides two communicators for MCP. Requires openmas[mcp]
.
-
McpSseCommunicator
- Protocol: MCP over HTTP Server-Sent Events (SSE).
- Best For: Networked MCP communication. Running an agent as an HTTP-based MCP server (e.g., for a UI to connect to) or connecting to remote HTTP-based MCP services.
- Dependencies:
mcp
SDK,fastapi
,uvicorn
,httpx
. - Configuration:
communicator_type: mcp_sse
. Options:server_mode
(bool),http_port
(server mode).
-
McpStdioCommunicator
- Protocol: MCP over standard input/output.
- Best For: Running an agent as an MCP service interacted with via stdin/stdout (e.g., as a CLI tool or managed subprocess). Connecting to external tools/engines that expose an MCP interface via stdio (like Stockfish configured for MCP).
- Dependencies:
mcp
SDK. - Configuration:
communicator_type: mcp_stdio
. Options:server_mode
(bool).
gRPC (GrpcCommunicator
)¶
Experimental Feature
Please note that the GrpcCommunicator
is considered experimental in OpenMAS v0.1.0. While providing basic gRPC functionality, it has undergone limited testing compared to the core HTTP and MCP communicators and may have an unstable API subject to change in future releases. It may lack comprehensive error handling or support for advanced gRPC features like streaming patterns. It is not recommended for use in production environments at this stage. Feedback and contributions to improve gRPC integration are welcome.
- Protocol: gRPC (using
grpcio
). - Best For: High-performance, low-latency RPC between services, potentially across different languages. Streaming scenarios. Microservice architectures where gRPC is standard. Requires
openmas[grpc]
. - Dependencies:
grpcio
,grpcio-tools
. - Configuration:
communicator_type: grpc
. Options:server_mode
(bool),grpc_port
(server mode). - Note: The default implementation uses a generic dictionary format. For type-safe communication using Protobuf definitions, customization might be needed.
MQTT (MqttCommunicator
)¶
Experimental Feature
Please note that the MqttCommunicator
is considered experimental in OpenMAS v0.1.0. While providing basic MQTT connectivity, it has undergone limited testing compared to the core HTTP and MCP communicators and may have an unstable API subject to change in future releases. Known limitations might include incomplete support for all MQTT QoS levels or specific broker configurations. It is not recommended for use in production environments at this stage. Feedback and contributions to enhance MQTT support are welcome.
- Protocol: MQTT (using
paho-mqtt
). - Best For: Publish/subscribe messaging patterns, event-driven architectures, decoupled communication, IoT applications. Requires an external MQTT broker. Requires
openmas[mqtt]
. - Dependencies:
paho-mqtt
. - Configuration:
communicator_type: mqtt
. Options:broker_host
,broker_port
,username
,password
,client_id
, TLS settings.
Choosing the Right Protocol¶
Scenario | Recommended Communicator(s) | Why? |
---|---|---|
General Request/Response between Agents/Services | HttpCommunicator |
Standard, widely supported, good for RESTful patterns. |
Interacting with Anthropic Models/MCP Services | McpSseCommunicator |
Native MCP support over standard web protocols. |
Exposing Agent as HTTP-based MCP Server | McpSseCommunicator |
Allows web clients/other services to interact via MCP over HTTP/SSE. |
Agent as CLI Tool using MCP | McpStdioCommunicator |
Uses stdin/stdout for MCP interaction. |
Connecting to MCP Tool via Subprocess (e.g. Stockfish) | McpStdioCommunicator |
Manages subprocess communication via MCP over stdio. |
High-Performance RPC | GrpcCommunicator |
Efficient binary protocol, good for low-latency internal comms. |
Publish/Subscribe, Event-Driven Systems | MqttCommunicator |
Decoupled messaging via a broker, suitable for event notifications. |
Agents in the Same Process (Testing/Simple) | MockCommunicator (testing) or potentially a future InMemoryCommunicator |
Lowest latency, avoids network overhead. |
Communicator Configuration Options¶
Refer to the specific communicator class documentation (or source code) and the Configuration Guide for detailed options applicable to each communicator type (e.g., http_port
, grpc_port
, broker_host
, server_mode
, timeout
). These are typically set within the communicator_options
dictionary in your configuration.
MCP Method Mapping (Client Perspective)¶
When an OpenMAS agent uses an MCP communicator (McpSseCommunicator
or McpStdioCommunicator
) in client mode to interact with a remote MCP server, the standard OpenMAS communicator methods (send_request
, send_notification
) are mapped to the underlying MCP protocol actions on the remote server:
Your Agent's Call (self.communicator.<method> ) |
Remote MCP Action Triggered | Description |
---|---|---|
send_request(target_service="svc", method="tool/list") |
list_tools() |
Lists tools on the remote MCP server svc . |
send_request(target_service="svc", method="tool/call", params={"name": "calc", "arguments": {"a":1}}) |
call_tool("calc", {"a":1}) |
Calls the tool named "calc" on the remote server svc . |
send_request(target_service="svc", method="prompt/list") |
list_prompts() |
Lists prompts on the remote server svc . |
send_request(target_service="svc", method="prompt/get", params={"name": "qa", "arguments": {"q":"Hi"}}) |
get_prompt("qa", {"q":"Hi"}) |
Gets the prompt named "qa" from the remote server svc . |
send_request(target_service="svc", method="resource/list") |
list_resources() |
Lists resources on the remote server svc . |
send_request(target_service="svc", method="resource/read", params={"uri": "/data.json"}) |
read_resource("/data.json") |
Reads the resource at the specified URI from server svc . |
send_request(target_service="svc", method="some_custom_name", params={...}) |
call_tool("some_custom_name", {...}) |
By convention, non-prefixed methods often map to call_tool on the server. |
send_notification(target_service="svc", method="log_event", params={...}) |
Async call_tool("log_event", {...}) |
Sends a notification, often mapped to an asynchronous tool call on the server svc where no response is expected by the caller. |
Note: This mapping applies when your OpenMAS agent is acting as an MCP client. When your agent acts as an MCP server (using MCPServerAgent
or a communicator in server_mode=True
), incoming MCP requests trigger the methods decorated with @mcp_tool
, @mcp_prompt
, or @mcp_resource
within your agent. See the MCP Integration Guide.
Extending with Custom Protocols¶
OpenMAS allows you to create custom protocol implementations by extending the BaseCommunicator
class and potentially leveraging the communicator extension system (see communicator_extensions.md
).
Communication in OpenMAS (OLD)¶
This document provides an overview of the communication protocols in OpenMAS, explaining when and how to use each one.
Communication Overview¶
OpenMAS provides a flexible communication layer that abstracts away the complexity of different communication protocols. The core of this system is the BaseCommunicator
interface, which all protocol implementations extend.
Available Protocols¶
HTTP Communication¶
The HTTP protocol implementation uses standard HTTP requests for communication between agents and services. It's suitable for: - RESTful API interactions - Web-based service integration - Scenarios where agents run in different processes or machines
Configuration Example:
from openmas.communication import HTTPCommunicator
communicator = HTTPCommunicator(
agent_name="agent1",
service_urls={
"service1": "http://localhost:8000/service1",
"service2": "http://localhost:8001/service2"
}
)
Message Channel Protocol (MCP)¶
The Message Channel Protocol is optimized for high-performance, in-memory communication. It's ideal for: - Agents running in the same process - High-frequency message passing - Testing and development - Integration with AI models and tools via Anthropic's MCP
OpenMAS provides specialized MCP communicator implementations:
MCP Stdio Communicator¶
Specifically designed for MCP communication over standard input/output. Can operate in both client and server roles.
Client Mode Example - Connecting to an External MCP Server:
from openmas.communication.mcp import McpStdioCommunicator
communicator = McpStdioCommunicator(
agent_name="agent1",
service_urls={
"local_service": "python -m service_script.py", # Command to run a local MCP service
"external_service": "stdio:/path/to/external/executable" # Connection to external executable
}
)
Server Mode Example:
from openmas.communication.mcp import McpStdioCommunicator
communicator = McpStdioCommunicator(
agent_name="agent1",
service_urls={}, # Not used in server mode
server_mode=True,
server_instructions="This agent provides analysis tools."
)
MCP SSE Communicator¶
Uses HTTP with Server-Sent Events for MCP communication. Can operate in both client and server roles.
Client Mode Example - Connecting to External MCP Servers:
from openmas.communication.mcp import McpSseCommunicator
communicator = McpSseCommunicator(
agent_name="agent1",
service_urls={
"local_service": "http://localhost:8000/mcp", # Local MCP service
"external_service": "http://external-server.example.com:8080" # External MCP server
}
)
Server Mode Example:
from fastapi import FastAPI
from openmas.communication.mcp import McpSseCommunicator
# Optional: Create a FastAPI app (will create one if not provided)
app = FastAPI(title="MCP Agent")
communicator = McpSseCommunicator(
agent_name="agent1",
service_urls={}, # Not used in server mode
server_mode=True,
http_port=8000,
server_instructions="This agent provides analysis tools.",
app=app # Optional
)
When to Use Each Protocol¶
Protocol | Best For | Limitations |
---|---|---|
HTTP | Cross-process/cross-machine communication | Higher latency, network dependency |
MCP Stdio | MCP with subprocess or CLI | Requires subprocess management |
MCP SSE | MCP with web services | Requires HTTP infrastructure |
When to Use Each MCP Communicator¶
Communicator | Client Mode | Server Mode | Best For |
---|---|---|---|
McpStdioCommunicator | Spawn and connect to subprocess MCP services or external executables | Run as MCP server via stdin/stdout | CLI tools, subprocess integration, connecting to existing MCP tools like Stockfish |
McpSseCommunicator | Connect to HTTP-based MCP services (local or external) | Run as HTTP/SSE MCP server | Web services, UI integration, API gateway patterns, connecting to cloud-based MCP services |
Connecting to External MCP Servers¶
OpenMAS supports connecting to external MCP servers in both stdio and SSE modes:
Stdio Connection Format¶
For stdio connections to external executables, use the stdio:
protocol prefix:
service_urls = {
"stockfish": "stdio:/usr/local/bin/stockfish", # Path to Stockfish executable
"custom_tool": "stdio:/path/to/custom/tool" # Path to any MCP-compatible executable
}
When using the stdio protocol prefix, OpenMAS will execute the specified binary and communicate with it via stdin/stdout.
SSE Connection Format¶
For SSE connections to external HTTP-based MCP servers, use standard HTTP URLs:
service_urls = {
"local_service": "http://localhost:8000", # Local MCP server
"cloud_service": "https://api.example.com/mcp-server" # External cloud MCP server
}
MCP Method Mapping¶
When an OpenMAS agent uses an MCP communicator (McpSseCommunicator
or McpStdioCommunicator
) in client mode to interact with a remote MCP server, the standard OpenMAS communicator methods (send_request
, send_notification
) are mapped to the underlying MCP protocol actions on the remote server:
Your Agent's Call (self.communicator.<method> ) |
Remote MCP Action Triggered | Description |
---|---|---|
send_request(target_service="svc", method="tool/list") |
list_tools() |
Lists tools on the remote MCP server svc . |
send_request(target_service="svc", method="tool/call", params={"name": "calc", "arguments": {"a":1}}) |
call_tool("calc", {"a":1}) |
Calls the tool named "calc" on the remote server svc . |
send_request(target_service="svc", method="prompt/list") |
list_prompts() |
Lists prompts on the remote server svc . |
send_request(target_service="svc", method="prompt/get", params={"name": "qa", "arguments": {"q":"Hi"}}) |
get_prompt("qa", {"q":"Hi"}) |
Gets the prompt named "qa" from the remote server svc . |
send_request(target_service="svc", method="resource/list") |
list_resources() |
Lists resources on the remote server svc . |
send_request(target_service="svc", method="resource/read", params={"uri": "/data.json"}) |
read_resource("/data.json") |
Reads the resource at the specified URI from server svc . |
send_request(target_service="svc", method="some_custom_name", params={...}) |
call_tool("some_custom_name", {...}) |
By convention, non-prefixed methods often map to call_tool on the server. |
send_notification(target_service="svc", method="log_event", params={...}) |
Async call_tool("log_event", {...}) |
Sends a notification, often mapped to an asynchronous tool call on the server svc where no response is expected by the caller. |
Note: This mapping applies when your OpenMAS agent is acting as an MCP client. When your agent acts as an MCP server (using MCPServerAgent
or a communicator in server_mode=True
), incoming MCP requests trigger the methods decorated with @mcp_tool
, @mcp_prompt
, or @mcp_resource
within your agent.
Extending with Custom Protocols¶
OpenMAS allows you to create custom protocol implementations by extending the BaseCommunicator
class. See the developer documentation for details on implementing your own communicator.
Communicator Extension System¶
OpenMAS includes an extension system that allows developers to create and register their own communicator implementations. This system enables easy extension of OpenMAS with custom communication protocols.
Using the Extension System¶
The communicator extension system allows users to specify a communicator type in their agent configuration:
from openmas.agent import BaseAgent
from openmas.config import AgentConfig
# Configure through environment variables
# COMMUNICATOR_TYPE=mcp_stdio
# COMMUNICATOR_OPTION_SERVER_MODE=true
# Or through direct initialization with config
agent = BaseAgent(
name="my-agent",
config=AgentConfig(
name="my-agent",
communicator_type="http", # or "mcp_stdio", "mcp_sse", etc.
communicator_options={
"server_mode": True,
"http_port": 8000
}
)
)
# Or just override the communicator class directly
from openmas.communication import HttpCommunicator
agent = BaseAgent(
name="my-agent",
communicator_class=HttpCommunicator
)
Available Configuration Options¶
HTTP Communicator¶
Option | Default | Description |
---|---|---|
None currently |
MCP Stdio Communicator¶
Option | Default | Description |
---|---|---|
server_mode |
False |
Whether to run in server mode |
server_instructions |
None |
Instructions for the server |
MCP SSE Communicator¶
Option | Default | Description |
---|---|---|
server_mode |
False |
Whether to run in server mode |
http_port |
8000 |
Port for the HTTP server (server mode only) |
server_instructions |
None |
Instructions for the server |
gRPC Communicator¶
Option | Default | Description |
---|---|---|
server_mode |
False |
Whether to run in server mode |
server_address |
[::]:50051 |
Address to bind the server to (server mode only) |
max_workers |
10 |
Maximum number of server worker threads |
channel_options |
{} |
Additional gRPC channel options |
gRPC Communication¶
The gRPC communicator provides efficient, high-performance communication using Google's gRPC framework. It requires the openmas[grpc]
extra to be installed.
It's suitable for:
- High-performance, type-safe communication (if using Protobuf definitions)
- Cross-language interoperability
- Streaming scenarios
- Microservice architectures where gRPC is already in use
Configuration Example (Client Mode):
# In config (e.g., YAML or environment variables)
# COMMUNICATOR_TYPE=grpc
# SERVICE_URL_MY_GRPC_SERVICE=localhost:50051
from openmas.communication.grpc import GrpcCommunicator
# Or instantiated directly:
communicator = GrpcCommunicator(
agent_name="my-grpc-client",
service_urls={
"my_grpc_service": "localhost:50051" # Address of the target gRPC server
},
# server_mode=False # Default
)
Configuration Example (Server Mode):
# In config:
# COMMUNICATOR_TYPE=grpc
# COMMUNICATOR_OPTION_SERVER_MODE=true
# COMMUNICATOR_OPTION_GRPC_PORT=50052
from openmas.communication.grpc import GrpcCommunicator
# Or instantiated directly:
communicator = GrpcCommunicator(
agent_name="my-grpc-server",
service_urls={}, # Not needed for server
server_mode=True,
grpc_port=50052 # Port for this agent's gRPC server to listen on
)
Note: The current GrpcCommunicator
uses a generic dictionary-based request/response format over gRPC. For type safety using Protobuf definitions, you would typically implement a custom communicator or extend the existing one.
MQTT Communication¶
The MQTT communicator uses the MQTT protocol for publish/subscribe messaging, typically via an MQTT broker. It requires the openmas[mqtt]
extra.
It's suitable for:
- Event-driven architectures
- Decoupled communication where agents publish or subscribe to topics
- IoT scenarios
- Situations where a message broker is preferred
Configuration Example:
# In config:
# COMMUNICATOR_TYPE=mqtt
# COMMUNICATOR_OPTION_BROKER_HOST=mqtt.eclipseprojects.io
# COMMUNICATOR_OPTION_BROKER_PORT=1883
# SERVICE_URLS={} # Often not used directly, communication is via pub/sub
from openmas.communication.mqtt import MqttCommunicator
# Or instantiated directly:
communicator = MqttCommunicator(
agent_name="my-mqtt-agent",
service_urls={}, # Agents typically discover each other via topics
broker_host="mqtt.eclipseprojects.io", # Address of the MQTT broker
broker_port=1883,
# Optional: username="user", password="pass", client_id="custom_id"
)
Note: With MQTT, agents usually communicate via topics rather than direct service names in service_urls
. The send_request
and send_notification
methods in the MqttCommunicator
likely map target_service
and method
to specific MQTT topics based on internal conventions.
MQTT Communicator¶
Option | Default | Description |
---|---|---|
broker_host |
"localhost" |
Hostname or IP address of the MQTT broker. |
broker_port |
1883 |
Port of the MQTT broker. |
username |
None |
Username for MQTT broker authentication. |
password |
None |
Password for MQTT broker authentication. |
client_id |
Auto-generated | MQTT client ID. If empty, one is generated. |
keepalive |
60 |
MQTT keepalive interval in seconds. |
tls_enabled |
False |
Enable TLS/SSL encryption. |
tls_ca_certs |
None |
Path to CA certificate file for TLS. |
tls_certfile |
None |
Path to client certificate file for TLS. |
tls_keyfile |
None |
Path to client private key file for TLS. |