Agent Patterns in OpenMAS¶
OpenMAS provides optional modules and base classes within the openmas.patterns
namespace to facilitate the implementation of common Multi-Agent System (MAS) design patterns. This guide highlights the patterns with built-in support and provides conceptual approaches for implementing other common workflows using core features.
Supported Patterns with Built-in Helpers¶
Orchestrator-Worker Pattern (openmas.patterns.orchestrator
)¶
This pattern is useful for coordinating complex workflows where a central agent (Orchestrator) delegates tasks to specialized agents (Workers). OpenMAS provides dedicated base classes for this:
BaseOrchestratorAgent
: Manages workflow execution (e.g., usingorchestrate_workflow
), delegates tasks (delegate_task
), aggregates results, discovers workers (discover_workers
), and handles potential worker failures or timeouts.BaseWorkerAgent
: Implements specific task logic using the@TaskHandler
decorator. Workers typically register themselves with an orchestrator.- Communication: Uses standard OpenMAS communication (
send_request
/send_notification
) for task delegation and result reporting.
Benefits:
- Clear separation of concerns.
- Improved scalability and maintainability.
- Facilitates parallelism and fault tolerance in workflows.
Example Usage:
import asyncio
from openmas.patterns.orchestrator import (
BaseOrchestratorAgent,
BaseWorkerAgent,
TaskHandler
)
from openmas.config import AgentConfig
from openmas.agent import BaseAgent # Needed for main example
from openmas.logging import configure_logging, get_logger
configure_logging(log_level="INFO")
logger = get_logger(__name__)
# Define a worker agent with task handlers
class MathWorker(BaseWorkerAgent):
# No __init__ needed unless adding custom state
@TaskHandler(task_type="add", description="Add two numbers")
async def add(self, a: int, b: int) -> dict:
logger.info(f"Worker '{self.name}' adding {a} + {b}")
return {"result": a + b}
@TaskHandler(task_type="multiply", description="Multiply two numbers")
async def multiply(self, a: int, b: int) -> dict:
logger.info(f"Worker '{self.name}' multiplying {a} * {b}")
return {"result": a * b}
# Define an orchestrator agent
class CalculationOrchestrator(BaseOrchestratorAgent):
# No __init__ needed unless adding custom state
async def calculate_expression(self, a: int, b: int, c: int) -> int:
"""Calculates a + (b * c) using worker agents."""
logger.info(f"Orchestrator calculating {a} + ({b} * {c})")
# Define a workflow sequence
workflow = [
{
"task_type": "multiply", # Task handled by MathWorker
"parameters": {"a": b, "b": c},
"worker_name": "math_worker" # Specify the target worker
},
{
"task_type": "add", # Task handled by MathWorker
"parameters": {"a": a}, # 'b' will come from previous result
"include_previous_results": {"b": "result"}, # Map previous 'result' to 'b'
"worker_name": "math_worker",
"abort_on_failure": True
}
]
# Execute the workflow (sequentially by default)
results = await self.orchestrate_workflow(workflow)
logger.info(f"Workflow results: {results}")
# Get the final result from the second step (index 1)
final_result_dict = results.get(1, {})
return final_result_dict.get("result", None)
# --- Main application setup ---
async def main():
# Create agents using standard BaseAgent initialization
# Communicators need to be compatible (e.g., HTTP)
http_port_orch = 8000
http_port_worker = 8001
orch_config = AgentConfig(
name="calc_orchestrator",
communicator_type="http",
service_urls={"math_worker": f"http://localhost:{http_port_worker}"},
communicator_options={"http_port": http_port_orch}
)
worker_config = AgentConfig(
name="math_worker",
communicator_type="http",
service_urls={"calc_orchestrator": f"http://localhost:{http_port_orch}"},
communicator_options={"http_port": http_port_worker}
)
orchestrator = CalculationOrchestrator(config=orch_config)
worker = MathWorker(config=worker_config)
# Start agents (this runs setup and run methods)
await orchestrator.start()
await worker.start()
# Worker needs to register itself with the orchestrator
logger.info("Registering worker...")
await worker.register_with_orchestrator(orchestrator.name)
logger.info("Worker registered.")
# Give a moment for registration/setup
await asyncio.sleep(1)
# Trigger the calculation on the orchestrator
logger.info("Triggering calculation...")
final_value = await orchestrator.calculate_expression(a=5, b=2, c=3)
logger.info(f"Final Calculated Result: {final_value}") # Should be 11
# Clean up
logger.info("Stopping agents...")
await worker.stop()
await orchestrator.stop()
logger.info("Agents stopped.")
if __name__ == "__main__":
asyncio.run(main())
(See the API documentation for BaseOrchestratorAgent
and BaseWorkerAgent
for more details on configuration and methods like discover_workers
, delegate_task
, orchestrate_workflow
options, etc.)
Implementing Other Workflows using Core Features¶
Many other patterns rely on agents sending messages to each other in specific sequences or based on certain conditions. You can implement these using BaseAgent
and send_request
.
1. Prompt Chaining / Sequential Processing¶
In this pattern, the output of one agent becomes the input for the next agent in a sequence.
Concept: Agent A calls Agent B, Agent B processes the data and calls Agent C, and so on. The initial call might return the final result after the whole chain completes, or intermediate agents might just forward requests.
Implementation:
- Each agent in the chain needs the address of the next agent in its
service_urls
configuration. - An agent's handler receives a request, performs its task, potentially transforms the data, and then uses
self.communicator.send_request
to call the next agent in the chain, passing the processed data. - The final agent in the chain returns the result back up the call stack.
# Conceptual Chaining Example
from openmas.agent import BaseAgent
import asyncio
class AgentB(BaseAgent):
async def setup(self):
await self.communicator.register_handler("process_b", self.handle_process)
async def handle_process(self, data_from_a: dict) -> dict:
# ... process data_from_a ...
processed_data = {"b_result": data_from_a.get("a_result", "") + "_processed_by_b"}
# Assume Agent C's logical name is 'agentC' configured in service_urls
# self.config.service_urls["agentC"] would contain the actual address
agent_c_service_name = "agentC"
# Call Agent C
final_result = await self.communicator.send_request(
target_service=agent_c_service_name,
method="process_c",
params={"data_from_b": processed_data}
)
return final_result # Return result from C
class AgentC(BaseAgent):
async def setup(self):
await self.communicator.register_handler("process_c", self.handle_process)
async def handle_process(self, data_from_b: dict) -> dict:
# ... process data_from_b ...
final_data = {"final": data_from_b.get("b_result", "") + "_processed_by_c"}
return final_data
# Agent A (not shown) would initiate the call to Agent B's "process_b" handler.
# Configuration (e.g., in Agent A and B) would map logical names like "agentB"
# and "agentC" to actual addresses (e.g., http://host:port).
2. Routing / Conditional Dispatch¶
An agent acts as a router, deciding which subsequent agent(s) to call based on the incoming request data or its internal state.
Concept: Agent R receives a request, inspects the parameters or context, and forwards the request to Agent X, Agent Y, or maybe both, based on defined rules.
Implementation:
- The Router Agent (Agent R) needs the addresses of all potential target agents (X, Y, etc.) in its
service_urls
. - The handler in Agent R contains the routing logic (e.g.,
if/elif/else
statements based on request parameters). - Based on the logic, Agent R uses
self.communicator.send_request
to call the appropriate target agent(s) and method(s). - The router might aggregate responses or simply return the response from the chosen target.
# Conceptual Router Example
from openmas.agent import BaseAgent
import asyncio
class RouterAgent(BaseAgent):
async def setup(self):
await self.communicator.register_handler("route_request", self.handle_route)
async def handle_route(self, request_data: dict) -> dict:
request_type = request_data.get("type")
payload = request_data.get("payload")
if request_type == "image":
# Assume logical name 'imageProcessor' is configured in service_urls
target_service = "imageProcessor"
target_method = "process_image"
elif request_type == "text":
# Assume logical name 'textAnalyzer' is configured in service_urls
target_service = "textAnalyzer"
target_method = "analyze_text"
else:
return {"error": "Unknown request type"}
# Forward the request to the chosen service
result = await self.communicator.send_request(
target_service=target_service,
method=target_method,
params=payload # Pass the original payload
)
return result
# Other agents (imageProcessor, textAnalyzer) would define the respective
# handlers (process_image, analyze_text).
# The RouterAgent's config would map these logical names to addresses.
3. Parallel Execution / Fan-Out¶
An agent sends the same (or similar) request to multiple other agents simultaneously and potentially aggregates the results.
Concept: Agent P receives a task, splits it or identifies multiple relevant workers (W1, W2, W3), sends requests to all workers in parallel, waits for all responses, and combines them.
Implementation:
- The Parallelizer Agent (Agent P) needs the addresses of all potential worker agents (W1, W2, W3) in its
service_urls
. - Agent P creates multiple
send_request
calls asasyncio
tasks. - It uses
asyncio.gather(*tasks)
to run these requests concurrently and wait for all of them to complete. - It then processes the list of results returned by
asyncio.gather
.
# Conceptual Parallelizer Example
from openmas.agent import BaseAgent
import asyncio
class ParallelAgent(BaseAgent):
async def setup(self):
await self.communicator.register_handler("process_in_parallel", self.handle_parallel)
async def handle_parallel(self, task_data: dict) -> dict:
# Assume worker_services list comes from config or discovery
# self.config.service_urls should contain mappings for "worker1", "worker2", etc.
worker_services = ["worker1", "worker2", "worker3"] # Logical names
tasks = []
for worker_name in worker_services:
task = self.communicator.send_request(
target_service=worker_name,
method="perform_subtask", # Assume workers implement this
params={"data": task_data} # Send same data to all
)
tasks.append(task)
# Execute all requests concurrently
# return_exceptions=True allows processing partial success
results = await asyncio.gather(*tasks, return_exceptions=True)
# Aggregate results
aggregated_results = []
errors = []
for i, res in enumerate(results):
if isinstance(res, Exception):
errors.append(f"Worker {worker_services[i]} failed: {res}")
elif isinstance(res, dict) and res.get("error"): # Handle application errors
errors.append(f"Worker {worker_services[i]} error: {res['error']}")
else:
aggregated_results.append(res)
return {
"aggregated": aggregated_results,
"errors": errors
}
# Worker agents (worker1, worker2, etc.) would define the 'perform_subtask' handler.
# The ParallelAgent's config would map these logical names to addresses.
Other Patterns¶
While OpenMAS provides specific helpers for the Orchestrator-Worker pattern, many other MAS patterns can be implemented using the core BaseAgent
and appropriate communicators:
- Publish/Subscribe: Can be effectively implemented using the
MqttCommunicator
with an MQTT broker, or by building a custom registry/dispatcher agent using standard communicators. - Contract Net Protocol: Involves agents broadcasting task announcements, receiving bids, and awarding contracts. This can be built using
send_request
andsend_notification
with custom logic within agents to manage the bidding and awarding process. - Service Discovery: A dedicated registry agent can be created where other agents register their services (
register_handler
) and query for available services (send_request
). TheBaseOrchestratorAgent
'sdiscover_workers
method provides a basic form of discovery for the Orchestrator-Worker pattern.
Building these patterns involves designing the interaction protocols and implementing the corresponding logic within your BaseAgent
subclasses.
Future Enhancements¶
While these patterns are implementable today using core features, dedicated framework helpers or classes for patterns like Chaining or Routing might be added in future OpenMAS versions to further simplify their implementation.