Module adcp.server.a2a_server

A2A server support for ADCP handlers.

Bridges ADCPHandler to the a2a-sdk server framework so the same handler can be served over both MCP and A2A transports.

from adcp.server import ADCPHandler, serve
serve(MyHandler(), name="my-agent", transport="a2a")

Functions

def create_a2a_server(handler: ADCPHandler,
*,
name: str = 'adcp-agent',
port: int | None = None,
description: str | None = None,
version: str = '1.0.0',
test_controller: TestControllerStore | None = None) ‑> Any
Expand source code
def create_a2a_server(
    handler: ADCPHandler,
    *,
    name: str = "adcp-agent",
    port: int | None = None,
    description: str | None = None,
    version: str = "1.0.0",
    test_controller: TestControllerStore | None = None,
) -> Any:
    """Create an A2A Starlette application from an ADCP handler.

    Args:
        handler: An ADCPHandler subclass instance.
        name: Agent name shown in the A2A agent card.
        port: Port number (used in the agent card URL).
        description: Agent description for the agent card.
        version: Agent version string.
        test_controller: Optional TestControllerStore for storyboard testing.

    Returns:
        A Starlette app ready to be run with uvicorn.
    """
    from a2a.server.apps.jsonrpc.starlette_app import A2AStarletteApplication

    resolved_port = port or int(os.environ.get("PORT", "3001"))

    executor = ADCPAgentExecutor(handler, test_controller=test_controller)

    agent_card = _build_agent_card(
        handler,
        name=name,
        port=resolved_port,
        description=description,
        version=version,
        extra_skills=_test_controller_skills() if test_controller else None,
    )

    task_store = InMemoryTaskStore()

    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=task_store,
    )

    a2a_app = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=request_handler,
    )

    return a2a_app.build()

Create an A2A Starlette application from an ADCP handler.

Args

handler
An ADCPHandler subclass instance.
name
Agent name shown in the A2A agent card.
port
Port number (used in the agent card URL).
description
Agent description for the agent card.
version
Agent version string.
test_controller
Optional TestControllerStore for storyboard testing.

Returns

A Starlette app ready to be run with uvicorn.

Classes

class ADCPAgentExecutor (handler: ADCPHandler, test_controller: TestControllerStore | None = None)
Expand source code
class ADCPAgentExecutor(AgentExecutor):
    """Bridges ADCPHandler methods to the a2a-sdk AgentExecutor interface.

    Incoming A2A messages are parsed to extract the ADCP skill name and
    parameters, dispatched to the matching handler method, and the result
    is published back as A2A Task events.

    Expects the explicit skill invocation format used by A2AAdapter:
        DataPart(data={"skill": "get_products", "parameters": {...}})
    """

    def __init__(
        self,
        handler: ADCPHandler,
        test_controller: TestControllerStore | None = None,
    ) -> None:
        self._handler = handler
        self._tool_callers: dict[str, Any] = {}

        # Build tool callers for all tools this handler supports.
        # Skip comply_test_controller unless the seller passed a
        # TestControllerStore; otherwise we would advertise a skill
        # backed only by the handler's not-supported stub.
        tool_defs = get_tools_for_handler(handler)
        for tool_def in tool_defs:
            name = tool_def["name"]
            if name == "comply_test_controller" and test_controller is None:
                continue
            self._tool_callers[name] = create_tool_caller(handler, name)

        if test_controller is not None:
            self._register_test_controller(test_controller)

    @property
    def supported_skills(self) -> list[str]:
        """List of skill names this executor can handle."""
        return list(self._tool_callers.keys())

    def _register_test_controller(self, store: TestControllerStore) -> None:
        """Register comply_test_controller as a callable skill."""

        async def _call_test_controller(
            params: dict[str, Any], context: ToolContext | None = None
        ) -> Any:
            return await _handle_test_controller(store, params)

        self._tool_callers["comply_test_controller"] = _call_test_controller

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        """Execute an ADCP skill from an incoming A2A message."""
        skill_name, params = self._parse_request(context)

        if skill_name is None:
            await self._send_error(event_queue, context, "No skill specified in message")
            return

        if skill_name not in self._tool_callers:
            await self._send_error(event_queue, context, f"Unknown skill: {skill_name}")
            return

        tool_context = _tool_context_from_request(context)
        try:
            result = await self._tool_callers[skill_name](params, tool_context)
            await self._send_result(event_queue, context, skill_name, result)
        except ADCPError as exc:
            # Application-layer AdCP error (IdempotencyConflictError etc.).
            # Emit a failed task with the adcp_error in a DataPart per
            # transport-errors.mdx §A2A Binding, plus a human-readable text
            # part. The JSON-RPC channel is reserved for transport-level
            # errors (auth rejected, rate-limited pre-dispatch).
            logger.info("AdCP application error for skill %s: %s", skill_name, exc)
            await self._send_adcp_error(event_queue, context, exc)
        except Exception:
            logger.exception("Error executing skill %s", skill_name)
            await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        """ADCP operations are synchronous; cancellation sets state to canceled."""
        event = _make_task(
            context,
            state=TaskState.canceled,
            message="Task canceled",
        )
        await event_queue.enqueue_event(event)

    # ------------------------------------------------------------------
    # Message parsing
    # ------------------------------------------------------------------

    def _parse_request(self, context: RequestContext) -> tuple[str | None, dict[str, Any]]:
        """Extract skill name and parameters from the A2A message.

        Supports two formats:
        1. Explicit skill invocation via DataPart:
           DataPart(data={"skill": "get_products", "parameters": {...}})
        2. Natural language fallback via TextPart (best-effort parse)
        """
        msg = context.message
        if msg is None or not msg.parts:
            return None, {}

        # Try DataPart first (explicit skill invocation)
        for part in msg.parts:
            inner = part.root if hasattr(part, "root") else part
            if isinstance(inner, DataPart) and isinstance(inner.data, dict):
                skill = inner.data.get("skill")
                params = inner.data.get("parameters", {})
                if skill:
                    return str(skill), params if isinstance(params, dict) else {}

        # Fallback: try to parse TextPart as JSON
        for part in msg.parts:
            inner = part.root if hasattr(part, "root") else part
            if isinstance(inner, TextPart):
                parsed = self._parse_text_request(inner.text)
                if parsed[0] is not None:
                    return parsed

        return None, {}

    def _parse_text_request(self, text: str) -> tuple[str | None, dict[str, Any]]:
        """Best-effort parse of a text request for skill + params."""
        try:
            data = json.loads(text)
            if isinstance(data, dict) and "skill" in data:
                return str(data["skill"]), data.get("parameters", {})
        except (json.JSONDecodeError, TypeError):
            pass
        return None, {}

    # ------------------------------------------------------------------
    # Response helpers
    # ------------------------------------------------------------------

    async def _send_result(
        self,
        event_queue: EventQueue,
        context: RequestContext,
        skill_name: str,
        result: Any,
    ) -> None:
        """Publish a completed task with the skill result."""
        # Normalize result to a JSON-safe dict
        if hasattr(result, "model_dump"):
            data = result.model_dump(mode="json", exclude_none=True)
        elif not isinstance(result, dict):
            data = {"result": result}
        else:
            data = result

        task = _make_task(
            context,
            state=TaskState.completed,
            data=data,
            message=f"Completed {skill_name}",
        )
        await event_queue.enqueue_event(task)

    async def _send_error(
        self,
        event_queue: EventQueue,
        context: RequestContext,
        error_msg: str,
    ) -> None:
        """Publish a failed task."""
        task = _make_task(
            context,
            state=TaskState.failed,
            message=error_msg,
        )
        await event_queue.enqueue_event(task)

    async def _send_adcp_error(
        self,
        event_queue: EventQueue,
        context: RequestContext,
        exc: ADCPError,
    ) -> None:
        """Publish a failed task carrying an AdCP ``adcp_error`` payload.

        Follows transport-errors.mdx §A2A Binding: failed task with artifact
        containing a ``DataPart`` keyed under ``adcp_error`` plus a terse
        ``TextPart`` for human/LLM consumption.
        """
        # Derive the spec error code. ADCPTaskError carries a list of codes
        # (e.g. IdempotencyConflictError → IDEMPOTENCY_CONFLICT); fall back
        # to a generic INTERNAL_ERROR when the exception doesn't supply one.
        code = "INTERNAL_ERROR"
        if isinstance(exc, ADCPTaskError) and exc.error_codes:
            code = str(exc.error_codes[0])

        adcp_error: dict[str, Any] = {
            "code": code,
            "message": exc.message,
        }
        recovery = STANDARD_ERROR_CODES.get(code, {}).get("recovery")
        if recovery:
            adcp_error["recovery"] = recovery
        suggestion = getattr(exc, "suggestion", None)
        if suggestion:
            adcp_error["suggestion"] = suggestion

        task = _make_task(
            context,
            state=TaskState.failed,
            data={"adcp_error": adcp_error},
            message=exc.message,
        )
        await event_queue.enqueue_event(task)

Bridges ADCPHandler methods to the a2a-sdk AgentExecutor interface.

Incoming A2A messages are parsed to extract the ADCP skill name and parameters, dispatched to the matching handler method, and the result is published back as A2A Task events.

Expects the explicit skill invocation format used by A2AAdapter: DataPart(data={"skill": "get_products", "parameters": {…}})

Ancestors

  • a2a.server.agent_execution.agent_executor.AgentExecutor
  • abc.ABC

Instance variables

prop supported_skills : list[str]
Expand source code
@property
def supported_skills(self) -> list[str]:
    """List of skill names this executor can handle."""
    return list(self._tool_callers.keys())

List of skill names this executor can handle.

Methods

async def cancel(self, context: RequestContext, event_queue: EventQueue) ‑> None
Expand source code
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
    """ADCP operations are synchronous; cancellation sets state to canceled."""
    event = _make_task(
        context,
        state=TaskState.canceled,
        message="Task canceled",
    )
    await event_queue.enqueue_event(event)

ADCP operations are synchronous; cancellation sets state to canceled.

async def execute(self, context: RequestContext, event_queue: EventQueue) ‑> None
Expand source code
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
    """Execute an ADCP skill from an incoming A2A message."""
    skill_name, params = self._parse_request(context)

    if skill_name is None:
        await self._send_error(event_queue, context, "No skill specified in message")
        return

    if skill_name not in self._tool_callers:
        await self._send_error(event_queue, context, f"Unknown skill: {skill_name}")
        return

    tool_context = _tool_context_from_request(context)
    try:
        result = await self._tool_callers[skill_name](params, tool_context)
        await self._send_result(event_queue, context, skill_name, result)
    except ADCPError as exc:
        # Application-layer AdCP error (IdempotencyConflictError etc.).
        # Emit a failed task with the adcp_error in a DataPart per
        # transport-errors.mdx §A2A Binding, plus a human-readable text
        # part. The JSON-RPC channel is reserved for transport-level
        # errors (auth rejected, rate-limited pre-dispatch).
        logger.info("AdCP application error for skill %s: %s", skill_name, exc)
        await self._send_adcp_error(event_queue, context, exc)
    except Exception:
        logger.exception("Error executing skill %s", skill_name)
        await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")

Execute an ADCP skill from an incoming A2A message.