Skip to content

Custom Adapters

Build your own executor to connect Aegis to any system.

The BaseExecutor Interface

from aegis.adapters.base import BaseExecutor
from aegis.core.action import Action
from aegis.core.result import Result, ResultStatus

class MyExecutor(BaseExecutor):
    async def execute(self, action: Action) -> Result:
        """Required: execute a single action."""
        ...

    async def verify(self, action: Action, result: Result) -> bool:
        """Optional: verify the action completed correctly."""
        return result.ok

    async def setup(self) -> None:
        """Optional: initialize resources (called once before execution)."""

    async def teardown(self) -> None:
        """Optional: clean up resources (called once after execution)."""

Only execute() is required. The others have sensible defaults.

Built-in: HttpxExecutor

For REST APIs, use the built-in HttpxExecutor instead of building your own:

from aegis.adapters.httpx_adapter import HttpxExecutor

executor = HttpxExecutor(
    base_url="https://api.example.com",
    default_headers={"Authorization": "Bearer ..."},
    timeout=30.0,
)
runtime = Runtime(executor=executor, policy=Policy.from_yaml("policy.yaml"))

# Action types map to HTTP methods: get, post, put, patch, delete
plan = runtime.plan([
    Action("get", "/users"),
    Action("post", "/users", params={"json": {"name": "Alice"}}),
])

Install with: pip install 'agent-aegis[httpx]'

Example: Custom REST API Executor

If you need custom behavior beyond what HttpxExecutor provides:

import httpx
from aegis.adapters.base import BaseExecutor
from aegis.core.action import Action
from aegis.core.result import Result, ResultStatus

class MyAPIExecutor(BaseExecutor):
    def __init__(self, base_url: str, headers: dict | None = None):
        self._base_url = base_url
        self._headers = headers or {}
        self._client: httpx.AsyncClient | None = None

    async def setup(self) -> None:
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
        )

    async def teardown(self) -> None:
        if self._client:
            await self._client.aclose()

    async def execute(self, action: Action) -> Result:
        method_map = {
            "read": "GET",
            "write": "POST",
            "update": "PUT",
            "delete": "DELETE",
        }
        method = method_map.get(action.type, "POST")
        path = action.params.get("path", "/")

        try:
            response = await self._client.request(method, path, json=action.params)
            response.raise_for_status()
            return Result(
                action=action,
                status=ResultStatus.SUCCESS,
                data=response.json(),
            )
        except Exception as e:
            return Result(
                action=action,
                status=ResultStatus.FAILED,
                error=str(e),
            )

Using Your Adapter

runtime = Runtime(
    executor=MyAPIExecutor(
        base_url="https://api.example.com",
        headers={"Authorization": "Bearer ..."},
    ),
    policy=Policy.from_yaml("policy.yaml"),
)

Custom Verification

Override verify() to add post-execution checks:

class StrictExecutor(BaseExecutor):
    async def execute(self, action: Action) -> Result:
        ...

    async def verify(self, action: Action, result: Result) -> bool:
        if action.type == "write" and result.ok:
            # Re-read the record to confirm the write took effect
            check = await self._read_back(action)
            return check is not None
        return result.ok

If verification fails, the result is marked as FAILED even if the execution itself succeeded.