Skip to content

OpenMAS API Reference

This document provides a reference for the key classes and methods in the OpenMAS SDK.

Agent Module

BaseAgent

from openmas.agent import BaseAgent

class MyAgent(BaseAgent):
    async def setup(self) -> None:
        """Set up the agent."""
        pass

    async def run(self) -> None:
        """Run the agent."""
        pass

    async def shutdown(self) -> None:
        """Shut down the agent."""
        pass

Key methods: - __init__(name=None, config=None, communicator_class=None, **kwargs): Initialize the agent - setup(): Set up the agent (called by start()) - run(): Run the agent (called by start()) - shutdown(): Shut down the agent (called by stop()) - start(): Start the agent (calls setup() then run()) - stop(): Stop the agent (calls shutdown()) - set_communicator(communicator): Set the agent's communicator - get_handler(method): Get a handler for the specified method - register_handler(method, handler): Register a handler for the specified method

MCP Agents

from openmas.agent import McpAgent, McpServerAgent, McpClientAgent

# Base MCP agent
class MyMcpAgent(McpAgent):
    pass

# MCP server agent
class MyServerAgent(McpServerAgent):
    pass

# MCP client agent
class MyClientAgent(McpClientAgent):
    pass

McpAgent

Key methods inherited from BaseAgent, plus: - _discover_mcp_methods(): Discover methods decorated with MCP decorators - register_with_server(server): Register the agent's MCP methods with an MCP server

McpServerAgent

Key methods: - setup_communicator(): Set up the MCP communicator (SSE or stdio) - start_server(): Start the MCP server - stop_server(): Stop the MCP server

McpClientAgent

Key methods: - connect_to_service(service_name, host, port): Connect to an MCP service - disconnect_from_service(service_name): Disconnect from an MCP service - list_tools(service_name): List tools available on a service - call_tool(service_name, tool_name, params): Call a tool on a service

MCP Decorators

from openmas.agent import mcp_tool, mcp_prompt, mcp_resource

class MyAgent(McpAgent):
    @mcp_tool(name="my_tool", description="My tool")
    async def my_tool(self, param1: str) -> dict:
        """Tool documentation."""
        return {"result": param1}

    @mcp_prompt(name="my_prompt", description="My prompt")
    async def my_prompt(self, context: str) -> str:
        """Prompt documentation."""
        return f"Context: {context}\n\nResponse:"

    @mcp_resource(uri="/resource", name="my_resource", mime_type="application/json")
    async def my_resource(self) -> bytes:
        """Resource documentation."""
        return b'{"key": "value"}'

Communication Module

Base Communicator

from openmas.communication import BaseCommunicator

# Abstract base class, not used directly

HTTP Communicator

from openmas.communication import HttpCommunicator

communicator = HttpCommunicator(
    agent_name="my-agent",
    service_urls={"other-service": "http://localhost:8000"},
    http_port=8001
)

Key methods: - start(): Start the communicator - stop(): Stop the communicator - register_handler(method, handler): Register a handler for the specified method - send_request(target_service, method, params): Send a request to a service - send_notification(target_service, method, params): Send a notification to a service

MCP Communicators

from openmas.communication.mcp import McpSseCommunicator, McpStdioCommunicator

# SSE-based MCP communicator (HTTP/SSE)
sse_communicator = McpSseCommunicator(
    agent_name="my-agent",
    service_urls={"mcp-service": "http://localhost:8000"},
    server_mode=False,
    http_port=8001
)

# Stdio-based MCP communicator (stdin/stdout)
stdio_communicator = McpStdioCommunicator(
    agent_name="my-agent",
    service_urls={},
    server_mode=True
)

Key methods (both communicators): - start(): Start the communicator - stop(): Stop the communicator - register_handler(method, handler): Register a handler for the specified method - register_mcp_methods(agent): Register the agent's MCP methods with the server

gRPC Communicator

from openmas.communication.grpc import GrpcCommunicator

grpc_communicator = GrpcCommunicator(
    agent_name="my-agent",
    service_urls={"grpc-service": "localhost:50051"},
    grpc_port=50052
)

MQTT Communicator

from openmas.communication.mqtt import MqttCommunicator

mqtt_communicator = MqttCommunicator(
    agent_name="my-agent",
    service_urls={},
    broker_host="localhost",
    broker_port=1883
)

Configuration Module

from openmas.config import load_config, AgentConfig
from pydantic import Field

# Load standard configuration
config = load_config(AgentConfig)

# Define custom configuration
class MyAgentConfig(AgentConfig):
    api_key: str = Field(..., description="API key for external service")
    model_name: str = Field("gpt-4", description="Model name to use")

# Load custom configuration
my_config = load_config(MyAgentConfig)

Key functions: - load_config(config_class): Load configuration from environment, files, etc. - find_project_root(): Find the root directory of the OpenMAS project

AgentConfig

Key fields: - name: Agent name (default: "agent") - log_level: Logging level (default: "INFO") - communicator_type: Type of communicator (default: "http") - service_urls: Dictionary of service URLs (default: {}) - communicator_options: Dictionary of options for the communicator (default: {})

Testing Module

import pytest
from openmas.testing import MockCommunicator, AgentTestHarness
from openmas.agent import BaseAgent

# Create a mock communicator
mock_communicator = MockCommunicator(agent_name="test-agent")

# Create a test harness
test_harness = AgentTestHarness(
    agent_class=BaseAgent,
    default_config={"name": "test-agent"}
)

MockCommunicator

Key methods: - expect_request(target_service, method, params, response): Expect a request and return a response - expect_request_exception(target_service, method, params, exception): Expect a request and raise an exception - expect_notification(target_service, method, params): Expect a notification - verify(): Verify that all expectations were met - trigger_handler(method, params): Trigger a handler for testing

AgentTestHarness

Key methods: - create_agent(**kwargs): Create an agent instance - running_agent(agent): Context manager for running an agent during tests - running_agents(*agents): Context manager for running multiple agents - link_agents(*agents): Link agents for in-memory communication - trigger_handler(agent, method, params): Trigger a handler on an agent - wait_for(condition, timeout, check_interval): Wait for a condition to be true - verify_all_communicators(): Verify all communicators' expectations

Logging Module

from openmas.logging import get_logger, configure_logging

# Configure logging
configure_logging(log_level="DEBUG")

# Get a logger
logger = get_logger(__name__)

# Use the logger
logger.debug("Debug message")
logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")

Key functions: - get_logger(name): Get a logger with the specified name - configure_logging(log_level, json_format): Configure logging for the application

Agent Patterns

Orchestrator-Worker pattern helpers for OpenMAS.

This module provides helper classes for implementing the Orchestrator-Worker pattern in a multi-agent system. The pattern consists of:

  1. An orchestrator agent that coordinates a workflow by delegating tasks to worker agents
  2. Worker agents that specialize in specific tasks and report results back to the orchestrator
  3. A communication mechanism for task delegation and result aggregation

This pattern is useful for decomposing complex workflows into modular components that can be executed by specialized agents, potentially in parallel.

BaseOrchestratorAgent

Bases: BaseAgent

Base orchestrator agent for coordinating tasks among worker agents.

The orchestrator is responsible for: 1. Managing the workflow of complex tasks 2. Discovering and tracking available worker agents 3. Delegating subtasks to appropriate worker agents 4. Aggregating results from workers 5. Handling failures and retries

Source code in src/openmas/patterns/orchestrator.py
class BaseOrchestratorAgent(BaseAgent):
    """Base orchestrator agent for coordinating tasks among worker agents.

    The orchestrator is responsible for:
    1. Managing the workflow of complex tasks
    2. Discovering and tracking available worker agents
    3. Delegating subtasks to appropriate worker agents
    4. Aggregating results from workers
    5. Handling failures and retries
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the orchestrator agent."""
        super().__init__(*args, **kwargs)

        # Dictionary mapping worker names to their capabilities
        self._workers: Dict[str, WorkerInfo] = {}

        # Dictionary mapping task IDs to their status and metadata
        self._tasks: Dict[str, Dict[str, Any]] = {}

        # Default timeout for worker responses
        self.default_timeout = 60.0

    async def setup(self) -> None:
        """Set up the orchestrator agent.

        Registers handlers for worker registration and task results.
        """
        await self.communicator.register_handler("register_worker", self._handle_worker_registration)
        await self.communicator.register_handler("task_result", self._handle_task_result)

    async def _handle_worker_registration(self, worker_info: Dict[str, Any]) -> Dict[str, Any]:
        """Handle worker registration requests.

        Args:
            worker_info: Information about the worker, including name and capabilities

        Returns:
            Registration confirmation
        """
        worker = WorkerInfo(**worker_info)
        self._workers[worker.name] = worker

        self.logger.info("Worker registered", worker_name=worker.name, capabilities=list(worker.capabilities))

        return {"status": "registered", "orchestrator": self.name}

    async def _handle_task_result(self, result_data: Dict[str, Any]) -> Dict[str, Any]:
        """Handle task results from workers.

        Args:
            result_data: The task result data from a worker

        Returns:
            Result acknowledgment
        """
        result = TaskResult(**result_data)

        if result.task_id not in self._tasks:
            self.logger.warning("Received result for unknown task", task_id=result.task_id)
            return {"status": "unknown_task"}

        task_info = self._tasks[result.task_id]
        task_info["status"] = result.status
        task_info["result"] = result.result
        task_info["completed_at"] = asyncio.get_event_loop().time()

        # Call the result callback if one was registered
        if "callback" in task_info and callable(task_info["callback"]):
            await task_info["callback"](result)

        self.logger.debug("Task result received", task_id=result.task_id, status=result.status)

        return {"status": "acknowledged"}

    async def discover_workers(self) -> List[WorkerInfo]:
        """Discover available worker agents.

        This method broadcasts a discovery message to find workers.

        Returns:
            List of discovered worker information
        """
        # Broadcast discovery message
        try:
            response = await self.communicator.send_request(
                target_service="broadcast",
                method="discover_workers",
                params={"orchestrator": self.name},
                timeout=5.0,
            )

            # Process responses
            for worker_data in response.get("workers", []):
                if isinstance(worker_data, dict) and "name" in worker_data:
                    worker = WorkerInfo(**worker_data)
                    self._workers[worker.name] = worker

            self.logger.info("Workers discovered", worker_count=len(self._workers), workers=list(self._workers.keys()))

        except Exception as e:
            self.logger.error("Error discovering workers", error=str(e))

        return list(self._workers.values())

    def find_worker_for_task(self, task_type: str) -> Optional[str]:
        """Find a suitable worker for a given task type.

        Args:
            task_type: The type of task to find a worker for

        Returns:
            The name of a suitable worker, or None if no worker is found
        """
        for name, info in self._workers.items():
            if task_type in info.capabilities:
                return name
        return None

    async def delegate_task(
        self,
        worker_name: str,
        task_type: str,
        parameters: Optional[Dict[str, Any]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        timeout: Optional[float] = None,
        callback: Optional[Callable[[TaskResult], Any]] = None,
    ) -> str:
        """Delegate a task to a worker agent.

        Args:
            worker_name: The name of the worker to delegate to
            task_type: The type of task to delegate
            parameters: Parameters for the task
            metadata: Additional metadata for the task
            timeout: Timeout for the task in seconds
            callback: Callback function to call when the task completes

        Returns:
            The ID of the delegated task

        Raises:
            ValueError: If the worker is not registered
        """
        if worker_name not in self._workers:
            raise ValueError(f"Worker '{worker_name}' is not registered")

        # Create the task request
        task_request = TaskRequest(task_type=task_type, parameters=parameters or {}, metadata=metadata or {})

        # Store task information
        self._tasks[task_request.task_id] = {
            "worker": worker_name,
            "task_type": task_type,
            "status": "pending",
            "created_at": asyncio.get_event_loop().time(),
            "timeout": timeout or self.default_timeout,
            "callback": callback,
        }

        # Send the task to the worker
        await self.communicator.send_notification(
            target_service=worker_name, method="execute_task", params=task_request.model_dump()
        )

        self.logger.debug("Task delegated", task_id=task_request.task_id, worker=worker_name, task_type=task_type)

        return task_request.task_id

    async def get_task_result(self, task_id: str, timeout: Optional[float] = None) -> Optional[TaskResult]:
        """Get the result of a task.

        Args:
            task_id: The ID of the task
            timeout: How long to wait for the result in seconds

        Returns:
            The task result, or None if the task is not found or times out
        """
        if task_id not in self._tasks:
            return None

        task_info = self._tasks[task_id]

        # If the task is already completed, return the result
        if task_info["status"] in ("success", "failure"):
            return TaskResult(
                task_id=task_id,
                status=task_info["status"],
                result=task_info.get("result"),
                error=task_info.get("error"),
            )

        # Wait for the result with timeout
        timeout_value = timeout or task_info["timeout"]
        start_time = asyncio.get_event_loop().time()

        while (asyncio.get_event_loop().time() - start_time) < timeout_value:
            # Check if the task has completed
            if task_info["status"] in ("success", "failure"):
                return TaskResult(
                    task_id=task_id,
                    status=task_info["status"],
                    result=task_info.get("result"),
                    error=task_info.get("error"),
                )

            # Wait a bit before checking again
            await asyncio.sleep(0.1)

        # Timeout occurred
        return TaskResult(task_id=task_id, status="timeout", error="Task timed out")

    async def orchestrate_workflow(
        self, tasks: List[Dict[str, Any]], parallel: bool = False
    ) -> Dict[int, Dict[str, Any]]:
        """Orchestrate a workflow of multiple tasks.

        Args:
            tasks: List of task definitions, each containing:
                - task_type: The type of task
                - parameters: Parameters for the task (optional)
                - worker: Specific worker to use (optional)
            parallel: Whether to execute tasks in parallel

        Returns:
            Dictionary mapping task positions or IDs to results
        """
        results: Dict[int, Dict[str, Any]] = {}

        if parallel:
            # Execute tasks in parallel
            task_futures = []
            for i, task_def in enumerate(tasks):
                worker = task_def.get("worker") or self.find_worker_for_task(task_def["task_type"])
                if not worker:
                    results[i] = {
                        "status": "failure",
                        "error": f"No worker found for task type: {task_def['task_type']}",
                    }
                    continue

                task_id = await self.delegate_task(
                    worker_name=worker, task_type=task_def["task_type"], parameters=task_def.get("parameters", {})
                )

                # Create a future for this task result
                task_futures.append((i, task_id))

            # Wait for all task results
            for i, task_id in task_futures:
                result = await self.get_task_result(task_id)
                results[i] = (
                    result.model_dump() if result else {"status": "failure", "error": "Failed to get task result"}
                )
        else:
            # Execute tasks sequentially
            for i, task_def in enumerate(tasks):
                worker = task_def.get("worker") or self.find_worker_for_task(task_def["task_type"])
                if not worker:
                    results[i] = {
                        "status": "failure",
                        "error": f"No worker found for task type: {task_def['task_type']}",
                    }
                    continue

                # Add results from previous tasks if requested
                parameters = task_def.get("parameters", {}).copy()
                if task_def.get("include_previous_results", False):
                    parameters["previous_results"] = results

                task_id = await self.delegate_task(
                    worker_name=worker, task_type=task_def["task_type"], parameters=parameters
                )

                result = await self.get_task_result(task_id)
                results[i] = (
                    result.model_dump() if result else {"status": "failure", "error": "Failed to get task result"}
                )

                # Stop the workflow if a task fails and abort_on_failure is set
                if (result is None or result.status != "success") and task_def.get("abort_on_failure", False):
                    break

        return results

__init__(*args, **kwargs)

Initialize the orchestrator agent.

Source code in src/openmas/patterns/orchestrator.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the orchestrator agent."""
    super().__init__(*args, **kwargs)

    # Dictionary mapping worker names to their capabilities
    self._workers: Dict[str, WorkerInfo] = {}

    # Dictionary mapping task IDs to their status and metadata
    self._tasks: Dict[str, Dict[str, Any]] = {}

    # Default timeout for worker responses
    self.default_timeout = 60.0

delegate_task(worker_name, task_type, parameters=None, metadata=None, timeout=None, callback=None) async

Delegate a task to a worker agent.

Parameters:

Name Type Description Default
worker_name str

The name of the worker to delegate to

required
task_type str

The type of task to delegate

required
parameters Optional[Dict[str, Any]]

Parameters for the task

None
metadata Optional[Dict[str, Any]]

Additional metadata for the task

None
timeout Optional[float]

Timeout for the task in seconds

None
callback Optional[Callable[[TaskResult], Any]]

Callback function to call when the task completes

None

Returns:

Type Description
str

The ID of the delegated task

Raises:

Type Description
ValueError

If the worker is not registered

Source code in src/openmas/patterns/orchestrator.py
async def delegate_task(
    self,
    worker_name: str,
    task_type: str,
    parameters: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
    timeout: Optional[float] = None,
    callback: Optional[Callable[[TaskResult], Any]] = None,
) -> str:
    """Delegate a task to a worker agent.

    Args:
        worker_name: The name of the worker to delegate to
        task_type: The type of task to delegate
        parameters: Parameters for the task
        metadata: Additional metadata for the task
        timeout: Timeout for the task in seconds
        callback: Callback function to call when the task completes

    Returns:
        The ID of the delegated task

    Raises:
        ValueError: If the worker is not registered
    """
    if worker_name not in self._workers:
        raise ValueError(f"Worker '{worker_name}' is not registered")

    # Create the task request
    task_request = TaskRequest(task_type=task_type, parameters=parameters or {}, metadata=metadata or {})

    # Store task information
    self._tasks[task_request.task_id] = {
        "worker": worker_name,
        "task_type": task_type,
        "status": "pending",
        "created_at": asyncio.get_event_loop().time(),
        "timeout": timeout or self.default_timeout,
        "callback": callback,
    }

    # Send the task to the worker
    await self.communicator.send_notification(
        target_service=worker_name, method="execute_task", params=task_request.model_dump()
    )

    self.logger.debug("Task delegated", task_id=task_request.task_id, worker=worker_name, task_type=task_type)

    return task_request.task_id

discover_workers() async

Discover available worker agents.

This method broadcasts a discovery message to find workers.

Returns:

Type Description
List[WorkerInfo]

List of discovered worker information

Source code in src/openmas/patterns/orchestrator.py
async def discover_workers(self) -> List[WorkerInfo]:
    """Discover available worker agents.

    This method broadcasts a discovery message to find workers.

    Returns:
        List of discovered worker information
    """
    # Broadcast discovery message
    try:
        response = await self.communicator.send_request(
            target_service="broadcast",
            method="discover_workers",
            params={"orchestrator": self.name},
            timeout=5.0,
        )

        # Process responses
        for worker_data in response.get("workers", []):
            if isinstance(worker_data, dict) and "name" in worker_data:
                worker = WorkerInfo(**worker_data)
                self._workers[worker.name] = worker

        self.logger.info("Workers discovered", worker_count=len(self._workers), workers=list(self._workers.keys()))

    except Exception as e:
        self.logger.error("Error discovering workers", error=str(e))

    return list(self._workers.values())

find_worker_for_task(task_type)

Find a suitable worker for a given task type.

Parameters:

Name Type Description Default
task_type str

The type of task to find a worker for

required

Returns:

Type Description
Optional[str]

The name of a suitable worker, or None if no worker is found

Source code in src/openmas/patterns/orchestrator.py
def find_worker_for_task(self, task_type: str) -> Optional[str]:
    """Find a suitable worker for a given task type.

    Args:
        task_type: The type of task to find a worker for

    Returns:
        The name of a suitable worker, or None if no worker is found
    """
    for name, info in self._workers.items():
        if task_type in info.capabilities:
            return name
    return None

get_task_result(task_id, timeout=None) async

Get the result of a task.

Parameters:

Name Type Description Default
task_id str

The ID of the task

required
timeout Optional[float]

How long to wait for the result in seconds

None

Returns:

Type Description
Optional[TaskResult]

The task result, or None if the task is not found or times out

Source code in src/openmas/patterns/orchestrator.py
async def get_task_result(self, task_id: str, timeout: Optional[float] = None) -> Optional[TaskResult]:
    """Get the result of a task.

    Args:
        task_id: The ID of the task
        timeout: How long to wait for the result in seconds

    Returns:
        The task result, or None if the task is not found or times out
    """
    if task_id not in self._tasks:
        return None

    task_info = self._tasks[task_id]

    # If the task is already completed, return the result
    if task_info["status"] in ("success", "failure"):
        return TaskResult(
            task_id=task_id,
            status=task_info["status"],
            result=task_info.get("result"),
            error=task_info.get("error"),
        )

    # Wait for the result with timeout
    timeout_value = timeout or task_info["timeout"]
    start_time = asyncio.get_event_loop().time()

    while (asyncio.get_event_loop().time() - start_time) < timeout_value:
        # Check if the task has completed
        if task_info["status"] in ("success", "failure"):
            return TaskResult(
                task_id=task_id,
                status=task_info["status"],
                result=task_info.get("result"),
                error=task_info.get("error"),
            )

        # Wait a bit before checking again
        await asyncio.sleep(0.1)

    # Timeout occurred
    return TaskResult(task_id=task_id, status="timeout", error="Task timed out")

orchestrate_workflow(tasks, parallel=False) async

Orchestrate a workflow of multiple tasks.

Parameters:

Name Type Description Default
tasks List[Dict[str, Any]]

List of task definitions, each containing: - task_type: The type of task - parameters: Parameters for the task (optional) - worker: Specific worker to use (optional)

required
parallel bool

Whether to execute tasks in parallel

False

Returns:

Type Description
Dict[int, Dict[str, Any]]

Dictionary mapping task positions or IDs to results

Source code in src/openmas/patterns/orchestrator.py
async def orchestrate_workflow(
    self, tasks: List[Dict[str, Any]], parallel: bool = False
) -> Dict[int, Dict[str, Any]]:
    """Orchestrate a workflow of multiple tasks.

    Args:
        tasks: List of task definitions, each containing:
            - task_type: The type of task
            - parameters: Parameters for the task (optional)
            - worker: Specific worker to use (optional)
        parallel: Whether to execute tasks in parallel

    Returns:
        Dictionary mapping task positions or IDs to results
    """
    results: Dict[int, Dict[str, Any]] = {}

    if parallel:
        # Execute tasks in parallel
        task_futures = []
        for i, task_def in enumerate(tasks):
            worker = task_def.get("worker") or self.find_worker_for_task(task_def["task_type"])
            if not worker:
                results[i] = {
                    "status": "failure",
                    "error": f"No worker found for task type: {task_def['task_type']}",
                }
                continue

            task_id = await self.delegate_task(
                worker_name=worker, task_type=task_def["task_type"], parameters=task_def.get("parameters", {})
            )

            # Create a future for this task result
            task_futures.append((i, task_id))

        # Wait for all task results
        for i, task_id in task_futures:
            result = await self.get_task_result(task_id)
            results[i] = (
                result.model_dump() if result else {"status": "failure", "error": "Failed to get task result"}
            )
    else:
        # Execute tasks sequentially
        for i, task_def in enumerate(tasks):
            worker = task_def.get("worker") or self.find_worker_for_task(task_def["task_type"])
            if not worker:
                results[i] = {
                    "status": "failure",
                    "error": f"No worker found for task type: {task_def['task_type']}",
                }
                continue

            # Add results from previous tasks if requested
            parameters = task_def.get("parameters", {}).copy()
            if task_def.get("include_previous_results", False):
                parameters["previous_results"] = results

            task_id = await self.delegate_task(
                worker_name=worker, task_type=task_def["task_type"], parameters=parameters
            )

            result = await self.get_task_result(task_id)
            results[i] = (
                result.model_dump() if result else {"status": "failure", "error": "Failed to get task result"}
            )

            # Stop the workflow if a task fails and abort_on_failure is set
            if (result is None or result.status != "success") and task_def.get("abort_on_failure", False):
                break

    return results

setup() async

Set up the orchestrator agent.

Registers handlers for worker registration and task results.

Source code in src/openmas/patterns/orchestrator.py
async def setup(self) -> None:
    """Set up the orchestrator agent.

    Registers handlers for worker registration and task results.
    """
    await self.communicator.register_handler("register_worker", self._handle_worker_registration)
    await self.communicator.register_handler("task_result", self._handle_task_result)

BaseWorkerAgent

Bases: BaseAgent

Base worker agent for processing specialized tasks.

Workers register with orchestrators, receive task assignments, process them according to their capabilities, and return results.

Source code in src/openmas/patterns/orchestrator.py
class BaseWorkerAgent(BaseAgent):
    """Base worker agent for processing specialized tasks.

    Workers register with orchestrators, receive task assignments,
    process them according to their capabilities, and return results.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the worker agent."""
        super().__init__(*args, **kwargs)

        # Dictionary mapping task types to handler methods
        self._task_handlers: Dict[str, Callable] = {}

        # Set of orchestrators this worker is registered with
        self._orchestrators: Set[str] = set()

        # Dict of active tasks being processed
        self._active_tasks: Dict[str, Dict[str, Any]] = {}

    async def setup(self) -> None:
        """Set up the worker agent.

        Discovers and registers task handlers, registers with orchestrators,
        and sets up communication handlers.
        """
        # Discover task handlers from class methods
        self._discover_task_handlers()

        # Register handler for executing tasks
        await self.communicator.register_handler("execute_task", self._handle_execute_task)

        # Register handler for discovery requests
        await self.communicator.register_handler("discover_workers", self._handle_discovery)

    def _discover_task_handlers(self) -> None:
        """Discover task handlers from class methods."""
        for attr_name in dir(self):
            attr = getattr(self, attr_name)
            if callable(attr) and hasattr(attr, "_task_handler"):
                task_info = getattr(attr, "_task_handler")
                if isinstance(task_info, dict) and "task_type" in task_info:
                    self._task_handlers[task_info["task_type"]] = attr
                    self.logger.debug("Registered task handler", task_type=task_info["task_type"], handler=attr_name)

    async def register_with_orchestrator(self, orchestrator_name: str) -> bool:
        """Register this worker with an orchestrator.

        Args:
            orchestrator_name: The name of the orchestrator to register with

        Returns:
            True if registration was successful, False otherwise
        """
        try:
            response = await self.communicator.send_request(
                target_service=orchestrator_name,
                method="register_worker",
                params={
                    "name": self.name,
                    "capabilities": list(self._task_handlers.keys()),
                    "metadata": {"agent_type": self.__class__.__name__},
                },
            )

            if response.get("status") == "registered":
                self._orchestrators.add(orchestrator_name)
                self.logger.info("Registered with orchestrator", orchestrator=orchestrator_name)
                return True

            self.logger.warning(
                "Failed to register with orchestrator", orchestrator=orchestrator_name, response=response
            )
            return False

        except Exception as e:
            self.logger.error("Error registering with orchestrator", orchestrator=orchestrator_name, error=str(e))
            return False

    async def _handle_discovery(self, discovery_request: Dict[str, Any]) -> Dict[str, Any]:
        """Handle worker discovery requests from orchestrators.

        Args:
            discovery_request: The discovery request

        Returns:
            Worker information
        """
        orchestrator = discovery_request.get("orchestrator")
        if orchestrator and orchestrator not in self._orchestrators:
            self._orchestrators.add(orchestrator)
            self.logger.info("Added orchestrator from discovery", orchestrator=orchestrator)

        return {
            "name": self.name,
            "capabilities": list(self._task_handlers.keys()),
            "metadata": {"agent_type": self.__class__.__name__},
        }

    async def _handle_execute_task(self, task_request_data: Dict[str, Any]) -> Dict[str, Any]:
        """Handle a task execution request from an orchestrator.

        Args:
            task_request_data: The task request data

        Returns:
            Acknowledgment of the task request
        """
        task_request = TaskRequest(**task_request_data)

        # Check if we have a handler for this task type
        if task_request.task_type not in self._task_handlers:
            self.logger.warning(
                "Received task with no handler", task_id=task_request.task_id, task_type=task_request.task_type
            )

            # Send failure result back to the orchestrator
            await self._send_task_result(
                task_id=task_request.task_id,
                status="failure",
                error=f"No handler for task type: {task_request.task_type}",
            )
            return {"status": "rejected", "reason": "no_handler"}

        # Store the task in active tasks
        self._active_tasks[task_request.task_id] = {
            "task_type": task_request.task_type,
            "parameters": task_request.parameters,
            "metadata": task_request.metadata,
            "status": "in_progress",
            "started_at": asyncio.get_event_loop().time(),
        }

        # Execute the task in the background
        asyncio.create_task(self._execute_task(task_request))

        return {"status": "accepted"}

    async def _execute_task(self, task_request: TaskRequest) -> None:
        """Execute a task in the background.

        Args:
            task_request: The task request to execute
        """
        handler = self._task_handlers[task_request.task_type]

        try:
            # Execute the handler
            result = await handler(**task_request.parameters)

            # Send success result
            await self._send_task_result(task_id=task_request.task_id, status="success", result=result)

        except Exception as e:
            self.logger.exception(
                "Error executing task", task_id=task_request.task_id, task_type=task_request.task_type, error=str(e)
            )

            # Send failure result
            await self._send_task_result(task_id=task_request.task_id, status="failure", error=str(e))

        # Remove from active tasks
        if task_request.task_id in self._active_tasks:
            del self._active_tasks[task_request.task_id]

    async def _send_task_result(
        self, task_id: str, status: str, result: Any = None, error: Optional[str] = None
    ) -> None:
        """Send a task result back to the orchestrator.

        Args:
            task_id: The ID of the task
            status: The status of the task ("success", "failure")
            result: The result of the task
            error: Error message if the task failed
        """
        task_info = self._active_tasks.get(task_id, {})
        orchestrator = task_info.get("metadata", {}).get("orchestrator")

        # If we don't know which orchestrator to send to, send to all
        if not orchestrator:
            for orch in self._orchestrators:
                await self._send_result_to_orchestrator(
                    orchestrator=orch, task_id=task_id, status=status, result=result, error=error
                )
        else:
            await self._send_result_to_orchestrator(
                orchestrator=orchestrator, task_id=task_id, status=status, result=result, error=error
            )

    async def _send_result_to_orchestrator(
        self, orchestrator: str, task_id: str, status: str, result: Any = None, error: Optional[str] = None
    ) -> None:
        """Send a task result to a specific orchestrator.

        Args:
            orchestrator: The name of the orchestrator
            task_id: The ID of the task
            status: The status of the task
            result: The result of the task
            error: Error message if the task failed
        """
        task_result = TaskResult(
            task_id=task_id, status=status, result=result, error=error, metadata={"worker": self.name}
        )

        try:
            await self.communicator.send_notification(
                target_service=orchestrator, method="task_result", params=task_result.model_dump()
            )

            self.logger.debug("Sent task result", task_id=task_id, orchestrator=orchestrator, status=status)

        except Exception as e:
            self.logger.error("Error sending task result", task_id=task_id, orchestrator=orchestrator, error=str(e))

__init__(*args, **kwargs)

Initialize the worker agent.

Source code in src/openmas/patterns/orchestrator.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the worker agent."""
    super().__init__(*args, **kwargs)

    # Dictionary mapping task types to handler methods
    self._task_handlers: Dict[str, Callable] = {}

    # Set of orchestrators this worker is registered with
    self._orchestrators: Set[str] = set()

    # Dict of active tasks being processed
    self._active_tasks: Dict[str, Dict[str, Any]] = {}

register_with_orchestrator(orchestrator_name) async

Register this worker with an orchestrator.

Parameters:

Name Type Description Default
orchestrator_name str

The name of the orchestrator to register with

required

Returns:

Type Description
bool

True if registration was successful, False otherwise

Source code in src/openmas/patterns/orchestrator.py
async def register_with_orchestrator(self, orchestrator_name: str) -> bool:
    """Register this worker with an orchestrator.

    Args:
        orchestrator_name: The name of the orchestrator to register with

    Returns:
        True if registration was successful, False otherwise
    """
    try:
        response = await self.communicator.send_request(
            target_service=orchestrator_name,
            method="register_worker",
            params={
                "name": self.name,
                "capabilities": list(self._task_handlers.keys()),
                "metadata": {"agent_type": self.__class__.__name__},
            },
        )

        if response.get("status") == "registered":
            self._orchestrators.add(orchestrator_name)
            self.logger.info("Registered with orchestrator", orchestrator=orchestrator_name)
            return True

        self.logger.warning(
            "Failed to register with orchestrator", orchestrator=orchestrator_name, response=response
        )
        return False

    except Exception as e:
        self.logger.error("Error registering with orchestrator", orchestrator=orchestrator_name, error=str(e))
        return False

setup() async

Set up the worker agent.

Discovers and registers task handlers, registers with orchestrators, and sets up communication handlers.

Source code in src/openmas/patterns/orchestrator.py
async def setup(self) -> None:
    """Set up the worker agent.

    Discovers and registers task handlers, registers with orchestrators,
    and sets up communication handlers.
    """
    # Discover task handlers from class methods
    self._discover_task_handlers()

    # Register handler for executing tasks
    await self.communicator.register_handler("execute_task", self._handle_execute_task)

    # Register handler for discovery requests
    await self.communicator.register_handler("discover_workers", self._handle_discovery)

TaskHandler

A decorator for registering task handlers in worker agents.

Source code in src/openmas/patterns/orchestrator.py
class TaskHandler:
    """A decorator for registering task handlers in worker agents."""

    def __init__(self, task_type: str, description: str = ""):
        """Initialize the task handler decorator.

        Args:
            task_type: The type of task this handler can process
            description: A description of the task handler
        """
        self.task_type = task_type
        self.description = description

    def __call__(self, func: Callable) -> Callable:
        """Decorate a method as a task handler.

        Args:
            func: The method to decorate

        Returns:
            The decorated method
        """
        setattr(func, "_task_handler", {"task_type": self.task_type, "description": self.description})
        return func

__call__(func)

Decorate a method as a task handler.

Parameters:

Name Type Description Default
func Callable

The method to decorate

required

Returns:

Type Description
Callable

The decorated method

Source code in src/openmas/patterns/orchestrator.py
def __call__(self, func: Callable) -> Callable:
    """Decorate a method as a task handler.

    Args:
        func: The method to decorate

    Returns:
        The decorated method
    """
    setattr(func, "_task_handler", {"task_type": self.task_type, "description": self.description})
    return func

__init__(task_type, description='')

Initialize the task handler decorator.

Parameters:

Name Type Description Default
task_type str

The type of task this handler can process

required
description str

A description of the task handler

''
Source code in src/openmas/patterns/orchestrator.py
def __init__(self, task_type: str, description: str = ""):
    """Initialize the task handler decorator.

    Args:
        task_type: The type of task this handler can process
        description: A description of the task handler
    """
    self.task_type = task_type
    self.description = description

TaskRequest

Bases: BaseModel

A task request sent from an orchestrator to a worker.

Source code in src/openmas/patterns/orchestrator.py
class TaskRequest(BaseModel):
    """A task request sent from an orchestrator to a worker."""

    task_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    task_type: str
    parameters: Dict[str, Any] = Field(default_factory=dict)
    metadata: Dict[str, Any] = Field(default_factory=dict)

TaskResult

Bases: BaseModel

A task result sent from a worker to an orchestrator.

Source code in src/openmas/patterns/orchestrator.py
class TaskResult(BaseModel):
    """A task result sent from a worker to an orchestrator."""

    task_id: str
    status: str  # "success", "failure", "in_progress"
    result: Optional[Any] = None
    error: Optional[str] = None
    metadata: Dict[str, Any] = Field(default_factory=dict)

WorkerInfo

Bases: BaseModel

Information about a worker agent.

Source code in src/openmas/patterns/orchestrator.py
class WorkerInfo(BaseModel):
    """Information about a worker agent."""

    name: str
    capabilities: Set[str] = Field(default_factory=set)
    metadata: Dict[str, Any] = Field(default_factory=dict)

Chaining pattern helpers for OpenMAS.

This module provides helper classes and functions for implementing the Chaining pattern in a multi-agent system. The pattern consists of:

  1. A sequence of service calls that are executed in order
  2. Results from earlier calls can be passed to later calls
  3. Error handling and optional retry mechanisms

This pattern is useful when a workflow needs to execute a series of steps in a defined order, where each step may depend on the result of previous steps.

ServiceChain

A chain of service calls that can be executed sequentially.

The ServiceChain allows defining a sequence of API calls to different services, with the ability to pass data between steps, transform inputs/outputs, apply conditions, retry logic, and error handling.

Source code in src/openmas/patterns/chaining.py
class ServiceChain:
    """A chain of service calls that can be executed sequentially.

    The ServiceChain allows defining a sequence of API calls to different services,
    with the ability to pass data between steps, transform inputs/outputs, apply
    conditions, retry logic, and error handling.
    """

    def __init__(self, communicator: Any, name: str = "service_chain"):
        """Initialize the ServiceChain.

        Args:
            communicator: The communicator to use for service calls
            name: Name of this chain for logging purposes
        """
        self.communicator = communicator
        self.name = name
        self.steps: List[ChainStep] = []
        self.logger = logger.bind(chain_name=name)

    def add_step(
        self,
        target_service: str,
        method: str,
        parameters: Optional[Dict[str, Any]] = None,
        name: Optional[str] = None,
        retry_count: int = 0,
        retry_delay: float = 1.0,
        timeout: Optional[float] = None,
        condition: Optional[Callable[[Dict[str, Any]], bool]] = None,
        transform_input: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
        transform_output: Optional[Callable[[Any], Any]] = None,
        error_handler: Optional[Callable[[Exception, Dict[str, Any]], Any]] = None,
    ) -> "ServiceChain":
        """Add a step to the chain.

        Args:
            target_service: The target service for this step
            method: The method to call on the service
            parameters: Parameters to pass to the method
            name: Optional name for this step
            retry_count: Number of times to retry on failure
            retry_delay: Delay between retries in seconds
            timeout: Timeout for this step in seconds
            condition: Optional condition function to determine if this step should execute
            transform_input: Optional function to transform input parameters
            transform_output: Optional function to transform the output
            error_handler: Optional function to handle errors

        Returns:
            The chain instance for method chaining
        """
        step = ChainStep(
            target_service=target_service,
            method=method,
            parameters=parameters or {},
            name=name or f"{target_service}.{method}",
            retry_count=retry_count,
            retry_delay=retry_delay,
            timeout=timeout,
            condition=condition,
            transform_input=transform_input,
            transform_output=transform_output,
            error_handler=error_handler,
        )
        self.steps.append(step)
        return self

    async def execute(self, initial_context: Optional[Dict[str, Any]] = None) -> ChainResult:
        """Execute the chain of service calls.

        Args:
            initial_context: Optional initial context data

        Returns:
            Result of the chain execution
        """
        context = initial_context or {}
        chain_result = ChainResult()

        # Track execution time
        start_time = asyncio.get_event_loop().time()

        # Execute steps sequentially
        for step in self.steps:
            step_result = await self._execute_step(step, context)
            chain_result.results.append(step_result)

            # Add result to context for next steps
            if step.name is not None:
                context[step.name] = step_result.result

            # Exit early if a step failed and no error handler recovered
            if step_result.status == ChainStepStatus.FAILURE:
                chain_result.successful = False
                break

        # Calculate total execution time
        chain_result.execution_time = asyncio.get_event_loop().time() - start_time

        # Set the final result to the result of the last successful step
        for step_result in reversed(chain_result.results):
            if step_result.status == ChainStepStatus.SUCCESS:
                chain_result.final_result = step_result.result
                break

        return chain_result

    async def _execute_step(self, step: ChainStep, context: Dict[str, Any]) -> ChainStepResult:
        """Execute a single step in the chain.

        Args:
            step: The step to execute
            context: The current context with results from previous steps

        Returns:
            Result of the step execution
        """
        result = ChainStepResult(step=step, status=ChainStepStatus.PENDING)

        # Check condition
        if step.condition is not None and not step.condition(context):
            result.status = ChainStepStatus.SKIPPED
            self.logger.info(f"Step {step.name} skipped due to condition", step=step.name)
            return result

        # Track execution time
        start_time = asyncio.get_event_loop().time()
        result.status = ChainStepStatus.IN_PROGRESS

        # Prepare parameters with context
        parameters = self._prepare_parameters(step, context)

        # Execute with retry logic
        attempt = 0
        while True:
            attempt += 1
            result.attempt_count = attempt

            try:
                response = await self.communicator.send_request(
                    target_service=step.target_service,
                    method=step.method,
                    params=parameters,
                    timeout=step.timeout,
                )

                # Process successful response
                if step.transform_output:
                    response = step.transform_output(response)

                result.result = response
                result.status = ChainStepStatus.SUCCESS
                result.execution_time = asyncio.get_event_loop().time() - start_time

                self.logger.debug(
                    f"Step {step.name} executed successfully",
                    step=step.name,
                    attempt=attempt,
                    execution_time=result.execution_time,
                )
                break

            except Exception as e:
                # Handle error
                if step.error_handler:
                    try:
                        # Try to recover with the error handler
                        recovery_result = step.error_handler(e, context)
                        result.result = recovery_result
                        result.status = ChainStepStatus.SUCCESS
                        result.execution_time = asyncio.get_event_loop().time() - start_time

                        self.logger.info(
                            f"Step {step.name} recovered from error with handler",
                            step=step.name,
                            error=str(e),
                            attempt=attempt,
                        )
                        break
                    except Exception as recovery_error:
                        # Error handler failed
                        self.logger.warning(
                            f"Error handler for step {step.name} failed",
                            step=step.name,
                            error=str(recovery_error),
                        )

                # Check if we should retry
                if attempt <= step.retry_count:
                    self.logger.info(
                        f"Retrying step {step.name} after error (attempt {attempt}/{step.retry_count})",
                        step=step.name,
                        error=str(e),
                        attempt=attempt,
                        retry_delay=step.retry_delay,
                    )
                    await asyncio.sleep(step.retry_delay)
                    continue

                # No more retries, mark as failed
                result.status = ChainStepStatus.FAILURE
                result.error = str(e)
                result.execution_time = asyncio.get_event_loop().time() - start_time

                self.logger.error(
                    f"Step {step.name} failed after {attempt} attempts",
                    step=step.name,
                    error=str(e),
                    attempt=attempt,
                )
                break

        return result

    def _prepare_parameters(self, step: ChainStep, context: Dict[str, Any]) -> Dict[str, Any]:
        """Prepare parameters for a step, incorporating context data.

        Args:
            step: The step being executed
            context: The current context with results from previous steps

        Returns:
            The prepared parameters
        """
        # Start with the step's defined parameters
        parameters = step.parameters.copy()

        # Look for placeholders in the parameters to substitute with context values
        for key, value in parameters.items():
            if isinstance(value, str) and value.startswith("$"):
                context_key = value[1:]
                if context_key in context:
                    parameters[key] = context[context_key]

        # Apply transform_input if defined
        if step.transform_input:
            parameters = step.transform_input(context)

        return parameters

__init__(communicator, name='service_chain')

Initialize the ServiceChain.

Parameters:

Name Type Description Default
communicator Any

The communicator to use for service calls

required
name str

Name of this chain for logging purposes

'service_chain'
Source code in src/openmas/patterns/chaining.py
def __init__(self, communicator: Any, name: str = "service_chain"):
    """Initialize the ServiceChain.

    Args:
        communicator: The communicator to use for service calls
        name: Name of this chain for logging purposes
    """
    self.communicator = communicator
    self.name = name
    self.steps: List[ChainStep] = []
    self.logger = logger.bind(chain_name=name)

add_step(target_service, method, parameters=None, name=None, retry_count=0, retry_delay=1.0, timeout=None, condition=None, transform_input=None, transform_output=None, error_handler=None)

Add a step to the chain.

Parameters:

Name Type Description Default
target_service str

The target service for this step

required
method str

The method to call on the service

required
parameters Optional[Dict[str, Any]]

Parameters to pass to the method

None
name Optional[str]

Optional name for this step

None
retry_count int

Number of times to retry on failure

0
retry_delay float

Delay between retries in seconds

1.0
timeout Optional[float]

Timeout for this step in seconds

None
condition Optional[Callable[[Dict[str, Any]], bool]]

Optional condition function to determine if this step should execute

None
transform_input Optional[Callable[[Dict[str, Any]], Dict[str, Any]]]

Optional function to transform input parameters

None
transform_output Optional[Callable[[Any], Any]]

Optional function to transform the output

None
error_handler Optional[Callable[[Exception, Dict[str, Any]], Any]]

Optional function to handle errors

None

Returns:

Type Description
ServiceChain

The chain instance for method chaining

Source code in src/openmas/patterns/chaining.py
def add_step(
    self,
    target_service: str,
    method: str,
    parameters: Optional[Dict[str, Any]] = None,
    name: Optional[str] = None,
    retry_count: int = 0,
    retry_delay: float = 1.0,
    timeout: Optional[float] = None,
    condition: Optional[Callable[[Dict[str, Any]], bool]] = None,
    transform_input: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
    transform_output: Optional[Callable[[Any], Any]] = None,
    error_handler: Optional[Callable[[Exception, Dict[str, Any]], Any]] = None,
) -> "ServiceChain":
    """Add a step to the chain.

    Args:
        target_service: The target service for this step
        method: The method to call on the service
        parameters: Parameters to pass to the method
        name: Optional name for this step
        retry_count: Number of times to retry on failure
        retry_delay: Delay between retries in seconds
        timeout: Timeout for this step in seconds
        condition: Optional condition function to determine if this step should execute
        transform_input: Optional function to transform input parameters
        transform_output: Optional function to transform the output
        error_handler: Optional function to handle errors

    Returns:
        The chain instance for method chaining
    """
    step = ChainStep(
        target_service=target_service,
        method=method,
        parameters=parameters or {},
        name=name or f"{target_service}.{method}",
        retry_count=retry_count,
        retry_delay=retry_delay,
        timeout=timeout,
        condition=condition,
        transform_input=transform_input,
        transform_output=transform_output,
        error_handler=error_handler,
    )
    self.steps.append(step)
    return self

execute(initial_context=None) async

Execute the chain of service calls.

Parameters:

Name Type Description Default
initial_context Optional[Dict[str, Any]]

Optional initial context data

None

Returns:

Type Description
ChainResult

Result of the chain execution

Source code in src/openmas/patterns/chaining.py
async def execute(self, initial_context: Optional[Dict[str, Any]] = None) -> ChainResult:
    """Execute the chain of service calls.

    Args:
        initial_context: Optional initial context data

    Returns:
        Result of the chain execution
    """
    context = initial_context or {}
    chain_result = ChainResult()

    # Track execution time
    start_time = asyncio.get_event_loop().time()

    # Execute steps sequentially
    for step in self.steps:
        step_result = await self._execute_step(step, context)
        chain_result.results.append(step_result)

        # Add result to context for next steps
        if step.name is not None:
            context[step.name] = step_result.result

        # Exit early if a step failed and no error handler recovered
        if step_result.status == ChainStepStatus.FAILURE:
            chain_result.successful = False
            break

    # Calculate total execution time
    chain_result.execution_time = asyncio.get_event_loop().time() - start_time

    # Set the final result to the result of the last successful step
    for step_result in reversed(chain_result.results):
        if step_result.status == ChainStepStatus.SUCCESS:
            chain_result.final_result = step_result.result
            break

    return chain_result

ChainBuilder

A builder for creating and executing service chains.

This builder provides a fluent interface for constructing service chains.

Source code in src/openmas/patterns/chaining.py
class ChainBuilder:
    """A builder for creating and executing service chains.

    This builder provides a fluent interface for constructing service chains.
    """

    def __init__(self, communicator: Any, name: str = "service_chain"):
        """Initialize the ChainBuilder.

        Args:
            communicator: The communicator to use for service calls
            name: Name of this chain for logging purposes
        """
        self.chain = ServiceChain(communicator, name)

    def add_step(
        self,
        target_service: str,
        method: str,
        parameters: Optional[Dict[str, Any]] = None,
        name: Optional[str] = None,
        retry_count: int = 0,
        retry_delay: float = 1.0,
        timeout: Optional[float] = None,
        condition: Optional[Callable[[Dict[str, Any]], bool]] = None,
        transform_input: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
        transform_output: Optional[Callable[[Any], Any]] = None,
        error_handler: Optional[Callable[[Exception, Dict[str, Any]], Any]] = None,
    ) -> "ChainBuilder":
        """Add a step to the chain.

        Args:
            target_service: The target service for this step
            method: The method to call on the service
            parameters: Parameters to pass to the method
            name: Optional name for this step
            retry_count: Number of times to retry on failure
            retry_delay: Delay between retries in seconds
            timeout: Timeout for this step in seconds
            condition: Optional condition function to determine if this step should execute
            transform_input: Optional function to transform input parameters
            transform_output: Optional function to transform the output
            error_handler: Optional function to handle errors

        Returns:
            The builder instance for method chaining
        """
        self.chain.add_step(
            target_service=target_service,
            method=method,
            parameters=parameters,
            name=name,
            retry_count=retry_count,
            retry_delay=retry_delay,
            timeout=timeout,
            condition=condition,
            transform_input=transform_input,
            transform_output=transform_output,
            error_handler=error_handler,
        )
        return self

    def build(self) -> ServiceChain:
        """Build and return the service chain.

        Returns:
            The constructed service chain
        """
        return self.chain

    async def execute(self, initial_context: Optional[Dict[str, Any]] = None) -> ChainResult:
        """Build and execute the service chain.

        Args:
            initial_context: Optional initial context data

        Returns:
            Result of the chain execution
        """
        return await self.chain.execute(initial_context)

__init__(communicator, name='service_chain')

Initialize the ChainBuilder.

Parameters:

Name Type Description Default
communicator Any

The communicator to use for service calls

required
name str

Name of this chain for logging purposes

'service_chain'
Source code in src/openmas/patterns/chaining.py
def __init__(self, communicator: Any, name: str = "service_chain"):
    """Initialize the ChainBuilder.

    Args:
        communicator: The communicator to use for service calls
        name: Name of this chain for logging purposes
    """
    self.chain = ServiceChain(communicator, name)

add_step(target_service, method, parameters=None, name=None, retry_count=0, retry_delay=1.0, timeout=None, condition=None, transform_input=None, transform_output=None, error_handler=None)

Add a step to the chain.

Parameters:

Name Type Description Default
target_service str

The target service for this step

required
method str

The method to call on the service

required
parameters Optional[Dict[str, Any]]

Parameters to pass to the method

None
name Optional[str]

Optional name for this step

None
retry_count int

Number of times to retry on failure

0
retry_delay float

Delay between retries in seconds

1.0
timeout Optional[float]

Timeout for this step in seconds

None
condition Optional[Callable[[Dict[str, Any]], bool]]

Optional condition function to determine if this step should execute

None
transform_input Optional[Callable[[Dict[str, Any]], Dict[str, Any]]]

Optional function to transform input parameters

None
transform_output Optional[Callable[[Any], Any]]

Optional function to transform the output

None
error_handler Optional[Callable[[Exception, Dict[str, Any]], Any]]

Optional function to handle errors

None

Returns:

Type Description
ChainBuilder

The builder instance for method chaining

Source code in src/openmas/patterns/chaining.py
def add_step(
    self,
    target_service: str,
    method: str,
    parameters: Optional[Dict[str, Any]] = None,
    name: Optional[str] = None,
    retry_count: int = 0,
    retry_delay: float = 1.0,
    timeout: Optional[float] = None,
    condition: Optional[Callable[[Dict[str, Any]], bool]] = None,
    transform_input: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
    transform_output: Optional[Callable[[Any], Any]] = None,
    error_handler: Optional[Callable[[Exception, Dict[str, Any]], Any]] = None,
) -> "ChainBuilder":
    """Add a step to the chain.

    Args:
        target_service: The target service for this step
        method: The method to call on the service
        parameters: Parameters to pass to the method
        name: Optional name for this step
        retry_count: Number of times to retry on failure
        retry_delay: Delay between retries in seconds
        timeout: Timeout for this step in seconds
        condition: Optional condition function to determine if this step should execute
        transform_input: Optional function to transform input parameters
        transform_output: Optional function to transform the output
        error_handler: Optional function to handle errors

    Returns:
        The builder instance for method chaining
    """
    self.chain.add_step(
        target_service=target_service,
        method=method,
        parameters=parameters,
        name=name,
        retry_count=retry_count,
        retry_delay=retry_delay,
        timeout=timeout,
        condition=condition,
        transform_input=transform_input,
        transform_output=transform_output,
        error_handler=error_handler,
    )
    return self

build()

Build and return the service chain.

Returns:

Type Description
ServiceChain

The constructed service chain

Source code in src/openmas/patterns/chaining.py
def build(self) -> ServiceChain:
    """Build and return the service chain.

    Returns:
        The constructed service chain
    """
    return self.chain

execute(initial_context=None) async

Build and execute the service chain.

Parameters:

Name Type Description Default
initial_context Optional[Dict[str, Any]]

Optional initial context data

None

Returns:

Type Description
ChainResult

Result of the chain execution

Source code in src/openmas/patterns/chaining.py
async def execute(self, initial_context: Optional[Dict[str, Any]] = None) -> ChainResult:
    """Build and execute the service chain.

    Args:
        initial_context: Optional initial context data

    Returns:
        Result of the chain execution
    """
    return await self.chain.execute(initial_context)

create_chain(communicator, name='service_chain')

Create a new service chain builder.

Parameters:

Name Type Description Default
communicator Any

The communicator to use for service calls

required
name str

Name of this chain for logging purposes

'service_chain'

Returns:

Type Description
ChainBuilder

A new chain builder

Source code in src/openmas/patterns/chaining.py
def create_chain(communicator: Any, name: str = "service_chain") -> ChainBuilder:
    """Create a new service chain builder.

    Args:
        communicator: The communicator to use for service calls
        name: Name of this chain for logging purposes

    Returns:
        A new chain builder
    """
    return ChainBuilder(communicator, name)

execute_chain(communicator, steps, initial_context=None, name='service_chain') async

Execute a chain of service calls defined by steps.

This is a convenience function for creating and executing a chain in a single call.

Parameters:

Name Type Description Default
communicator Any

The communicator to use for service calls

required
steps List[Dict[str, Any]]

List of step definitions, each a dict with parameters for add_step

required
initial_context Optional[Dict[str, Any]]

Optional initial context data

None
name str

Name of this chain for logging purposes

'service_chain'

Returns:

Type Description
ChainResult

Result of the chain execution

Source code in src/openmas/patterns/chaining.py
async def execute_chain(
    communicator: Any,
    steps: List[Dict[str, Any]],
    initial_context: Optional[Dict[str, Any]] = None,
    name: str = "service_chain",
) -> ChainResult:
    """Execute a chain of service calls defined by steps.

    This is a convenience function for creating and executing a chain in a single call.

    Args:
        communicator: The communicator to use for service calls
        steps: List of step definitions, each a dict with parameters for add_step
        initial_context: Optional initial context data
        name: Name of this chain for logging purposes

    Returns:
        Result of the chain execution
    """
    builder = ChainBuilder(communicator, name)

    for step_def in steps:
        builder.add_step(**step_def)

    return await builder.execute(initial_context)

Deployment

Generators for deployment configurations from OpenMAS metadata.