Module adcp.server.a2a_server
A2A server support for ADCP handlers.
Bridges ADCPHandler to the a2a-sdk server framework so the same handler can be served over both MCP and A2A transports.
from adcp.server import ADCPHandler, serve
serve(MyHandler(), name="my-agent", transport="a2a")
Functions
def create_a2a_server(handler: ADCPHandler,
*,
name: str = 'adcp-agent',
port: int | None = None,
description: str | None = None,
version: str = '1.0.0',
test_controller: TestControllerStore | None = None) ‑> Any-
Expand source code
def create_a2a_server( handler: ADCPHandler, *, name: str = "adcp-agent", port: int | None = None, description: str | None = None, version: str = "1.0.0", test_controller: TestControllerStore | None = None, ) -> Any: """Create an A2A Starlette application from an ADCP handler. Args: handler: An ADCPHandler subclass instance. name: Agent name shown in the A2A agent card. port: Port number (used in the agent card URL). description: Agent description for the agent card. version: Agent version string. test_controller: Optional TestControllerStore for storyboard testing. Returns: A Starlette app ready to be run with uvicorn. """ from a2a.server.apps.jsonrpc.starlette_app import A2AStarletteApplication resolved_port = port or int(os.environ.get("PORT", "3001")) executor = ADCPAgentExecutor(handler, test_controller=test_controller) agent_card = _build_agent_card( handler, name=name, port=resolved_port, description=description, version=version, extra_skills=_test_controller_skills() if test_controller else None, ) task_store = InMemoryTaskStore() request_handler = DefaultRequestHandler( agent_executor=executor, task_store=task_store, ) a2a_app = A2AStarletteApplication( agent_card=agent_card, http_handler=request_handler, ) return a2a_app.build()Create an A2A Starlette application from an ADCP handler.
Args
handler- An ADCPHandler subclass instance.
name- Agent name shown in the A2A agent card.
port- Port number (used in the agent card URL).
description- Agent description for the agent card.
version- Agent version string.
test_controller- Optional TestControllerStore for storyboard testing.
Returns
A Starlette app ready to be run with uvicorn.
Classes
class ADCPAgentExecutor (handler: ADCPHandler, test_controller: TestControllerStore | None = None)-
Expand source code
class ADCPAgentExecutor(AgentExecutor): """Bridges ADCPHandler methods to the a2a-sdk AgentExecutor interface. Incoming A2A messages are parsed to extract the ADCP skill name and parameters, dispatched to the matching handler method, and the result is published back as A2A Task events. Expects the explicit skill invocation format used by A2AAdapter: DataPart(data={"skill": "get_products", "parameters": {...}}) """ def __init__( self, handler: ADCPHandler, test_controller: TestControllerStore | None = None, ) -> None: self._handler = handler self._tool_callers: dict[str, Any] = {} # Build tool callers for all tools this handler supports. # Skip comply_test_controller unless the seller passed a # TestControllerStore; otherwise we would advertise a skill # backed only by the handler's not-supported stub. tool_defs = get_tools_for_handler(handler) for tool_def in tool_defs: name = tool_def["name"] if name == "comply_test_controller" and test_controller is None: continue self._tool_callers[name] = create_tool_caller(handler, name) if test_controller is not None: self._register_test_controller(test_controller) @property def supported_skills(self) -> list[str]: """List of skill names this executor can handle.""" return list(self._tool_callers.keys()) def _register_test_controller(self, store: TestControllerStore) -> None: """Register comply_test_controller as a callable skill.""" async def _call_test_controller( params: dict[str, Any], context: ToolContext | None = None ) -> Any: return await _handle_test_controller(store, params) self._tool_callers["comply_test_controller"] = _call_test_controller async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: """Execute an ADCP skill from an incoming A2A message.""" skill_name, params = self._parse_request(context) if skill_name is None: await self._send_error(event_queue, context, "No skill specified in message") return if skill_name not in self._tool_callers: await self._send_error(event_queue, context, f"Unknown skill: {skill_name}") return tool_context = _tool_context_from_request(context) try: result = await self._tool_callers[skill_name](params, tool_context) await self._send_result(event_queue, context, skill_name, result) except ADCPError as exc: # Application-layer AdCP error (IdempotencyConflictError etc.). # Emit a failed task with the adcp_error in a DataPart per # transport-errors.mdx §A2A Binding, plus a human-readable text # part. The JSON-RPC channel is reserved for transport-level # errors (auth rejected, rate-limited pre-dispatch). logger.info("AdCP application error for skill %s: %s", skill_name, exc) await self._send_adcp_error(event_queue, context, exc) except Exception: logger.exception("Error executing skill %s", skill_name) await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}") async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: """ADCP operations are synchronous; cancellation sets state to canceled.""" event = _make_task( context, state=TaskState.canceled, message="Task canceled", ) await event_queue.enqueue_event(event) # ------------------------------------------------------------------ # Message parsing # ------------------------------------------------------------------ def _parse_request(self, context: RequestContext) -> tuple[str | None, dict[str, Any]]: """Extract skill name and parameters from the A2A message. Supports two formats: 1. Explicit skill invocation via DataPart: DataPart(data={"skill": "get_products", "parameters": {...}}) 2. Natural language fallback via TextPart (best-effort parse) """ msg = context.message if msg is None or not msg.parts: return None, {} # Try DataPart first (explicit skill invocation) for part in msg.parts: inner = part.root if hasattr(part, "root") else part if isinstance(inner, DataPart) and isinstance(inner.data, dict): skill = inner.data.get("skill") params = inner.data.get("parameters", {}) if skill: return str(skill), params if isinstance(params, dict) else {} # Fallback: try to parse TextPart as JSON for part in msg.parts: inner = part.root if hasattr(part, "root") else part if isinstance(inner, TextPart): parsed = self._parse_text_request(inner.text) if parsed[0] is not None: return parsed return None, {} def _parse_text_request(self, text: str) -> tuple[str | None, dict[str, Any]]: """Best-effort parse of a text request for skill + params.""" try: data = json.loads(text) if isinstance(data, dict) and "skill" in data: return str(data["skill"]), data.get("parameters", {}) except (json.JSONDecodeError, TypeError): pass return None, {} # ------------------------------------------------------------------ # Response helpers # ------------------------------------------------------------------ async def _send_result( self, event_queue: EventQueue, context: RequestContext, skill_name: str, result: Any, ) -> None: """Publish a completed task with the skill result.""" # Normalize result to a JSON-safe dict if hasattr(result, "model_dump"): data = result.model_dump(mode="json", exclude_none=True) elif not isinstance(result, dict): data = {"result": result} else: data = result task = _make_task( context, state=TaskState.completed, data=data, message=f"Completed {skill_name}", ) await event_queue.enqueue_event(task) async def _send_error( self, event_queue: EventQueue, context: RequestContext, error_msg: str, ) -> None: """Publish a failed task.""" task = _make_task( context, state=TaskState.failed, message=error_msg, ) await event_queue.enqueue_event(task) async def _send_adcp_error( self, event_queue: EventQueue, context: RequestContext, exc: ADCPError, ) -> None: """Publish a failed task carrying an AdCP ``adcp_error`` payload. Follows transport-errors.mdx §A2A Binding: failed task with artifact containing a ``DataPart`` keyed under ``adcp_error`` plus a terse ``TextPart`` for human/LLM consumption. """ # Derive the spec error code. ADCPTaskError carries a list of codes # (e.g. IdempotencyConflictError → IDEMPOTENCY_CONFLICT); fall back # to a generic INTERNAL_ERROR when the exception doesn't supply one. code = "INTERNAL_ERROR" if isinstance(exc, ADCPTaskError) and exc.error_codes: code = str(exc.error_codes[0]) adcp_error: dict[str, Any] = { "code": code, "message": exc.message, } recovery = STANDARD_ERROR_CODES.get(code, {}).get("recovery") if recovery: adcp_error["recovery"] = recovery suggestion = getattr(exc, "suggestion", None) if suggestion: adcp_error["suggestion"] = suggestion task = _make_task( context, state=TaskState.failed, data={"adcp_error": adcp_error}, message=exc.message, ) await event_queue.enqueue_event(task)Bridges ADCPHandler methods to the a2a-sdk AgentExecutor interface.
Incoming A2A messages are parsed to extract the ADCP skill name and parameters, dispatched to the matching handler method, and the result is published back as A2A Task events.
Expects the explicit skill invocation format used by A2AAdapter: DataPart(data={"skill": "get_products", "parameters": {…}})
Ancestors
- a2a.server.agent_execution.agent_executor.AgentExecutor
- abc.ABC
Instance variables
prop supported_skills : list[str]-
Expand source code
@property def supported_skills(self) -> list[str]: """List of skill names this executor can handle.""" return list(self._tool_callers.keys())List of skill names this executor can handle.
Methods
async def cancel(self, context: RequestContext, event_queue: EventQueue) ‑> None-
Expand source code
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: """ADCP operations are synchronous; cancellation sets state to canceled.""" event = _make_task( context, state=TaskState.canceled, message="Task canceled", ) await event_queue.enqueue_event(event)ADCP operations are synchronous; cancellation sets state to canceled.
async def execute(self, context: RequestContext, event_queue: EventQueue) ‑> None-
Expand source code
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: """Execute an ADCP skill from an incoming A2A message.""" skill_name, params = self._parse_request(context) if skill_name is None: await self._send_error(event_queue, context, "No skill specified in message") return if skill_name not in self._tool_callers: await self._send_error(event_queue, context, f"Unknown skill: {skill_name}") return tool_context = _tool_context_from_request(context) try: result = await self._tool_callers[skill_name](params, tool_context) await self._send_result(event_queue, context, skill_name, result) except ADCPError as exc: # Application-layer AdCP error (IdempotencyConflictError etc.). # Emit a failed task with the adcp_error in a DataPart per # transport-errors.mdx §A2A Binding, plus a human-readable text # part. The JSON-RPC channel is reserved for transport-level # errors (auth rejected, rate-limited pre-dispatch). logger.info("AdCP application error for skill %s: %s", skill_name, exc) await self._send_adcp_error(event_queue, context, exc) except Exception: logger.exception("Error executing skill %s", skill_name) await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")Execute an ADCP skill from an incoming A2A message.