Module adcp.protocols.a2a
Classes
class A2AAdapter (agent_config: AgentConfig)-
Expand source code
class A2AAdapter(ProtocolAdapter): """Adapter for A2A protocol following the Agent2Agent specification.""" def __init__(self, agent_config: AgentConfig): """Initialize A2A adapter with reusable HTTP client.""" super().__init__(agent_config) self._client: httpx.AsyncClient | None = None async def _get_client(self) -> httpx.AsyncClient: """Get or create the HTTP client with connection pooling.""" if self._client is None: # Configure connection pooling for better performance limits = httpx.Limits( max_keepalive_connections=10, max_connections=20, keepalive_expiry=30.0, ) self._client = httpx.AsyncClient(limits=limits) logger.debug( f"Created HTTP client with connection pooling for agent {self.agent_config.id}" ) return self._client async def close(self) -> None: """Close the HTTP client and clean up resources.""" if self._client is not None: logger.debug(f"Closing A2A adapter client for agent {self.agent_config.id}") await self._client.aclose() self._client = None async def _call_a2a_tool(self, tool_name: str, params: dict[str, Any]) -> TaskResult[Any]: """ Call a tool using A2A protocol. A2A uses a tasks/send endpoint to initiate tasks. The agent responds with task status and may require multiple roundtrips for completion. """ start_time = time.time() if self.agent_config.debug else None client = await self._get_client() headers = {"Content-Type": "application/json"} if self.agent_config.auth_token: # Support custom auth headers and types if self.agent_config.auth_type == "bearer": headers[self.agent_config.auth_header] = f"Bearer {self.agent_config.auth_token}" else: headers[self.agent_config.auth_header] = self.agent_config.auth_token # Construct A2A message message = { "role": "user", "parts": [ { "type": "text", "text": self._format_tool_request(tool_name, params), } ], } # A2A uses message/send endpoint url = f"{self.agent_config.agent_uri}/message/send" request_data = { "message": message, "context_id": str(uuid4()), } debug_info = None if self.agent_config.debug: debug_request = { "url": url, "method": "POST", "headers": { k: ( v if k.lower() not in ("authorization", self.agent_config.auth_header.lower()) else "***" ) for k, v in headers.items() }, "body": request_data, } try: response = await client.post( url, json=request_data, headers=headers, timeout=self.agent_config.timeout, ) response.raise_for_status() data = response.json() if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request, response={"status": response.status_code, "body": data}, duration_ms=duration_ms, ) # Parse A2A response format # A2A tasks have lifecycle: submitted, working, completed, failed, input-required task_status = data.get("task", {}).get("status") if task_status in ("completed", "working"): # Extract the result from the response message result_data = self._extract_result(data) return TaskResult[Any]( status=TaskStatus.COMPLETED, data=result_data, success=True, metadata={"task_id": data.get("task", {}).get("id")}, debug_info=debug_info, ) elif task_status == "failed": return TaskResult[Any]( status=TaskStatus.FAILED, error=data.get("message", {}).get("parts", [{}])[0].get("text", "Task failed"), success=False, debug_info=debug_info, ) else: # Handle other states (submitted, input-required) return TaskResult[Any]( status=TaskStatus.SUBMITTED, data=data, success=True, metadata={"task_id": data.get("task", {}).get("id")}, debug_info=debug_info, ) except httpx.HTTPError as e: if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request, response={"error": str(e)}, duration_ms=duration_ms, ) return TaskResult[Any]( status=TaskStatus.FAILED, error=str(e), success=False, debug_info=debug_info, ) def _format_tool_request(self, tool_name: str, params: dict[str, Any]) -> str: """Format tool request as natural language for A2A.""" # For AdCP tools, we format as a structured request import json return f"Execute tool: {tool_name}\nParameters: {json.dumps(params, indent=2)}" def _extract_result(self, response_data: dict[str, Any]) -> Any: """Extract result data from A2A response.""" # Try to extract structured data from response message = response_data.get("message", {}) parts = message.get("parts", []) if not parts: return response_data # Return the first part's content first_part = parts[0] if first_part.get("type") == "text": # Try to parse as JSON if it looks like structured data text = first_part.get("text", "") try: import json return json.loads(text) except (json.JSONDecodeError, ValueError): return text return first_part # ======================================================================== # ADCP Protocol Methods # ======================================================================== async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]: """Get advertising products.""" return await self._call_a2a_tool("get_products", params) async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]: """List supported creative formats.""" return await self._call_a2a_tool("list_creative_formats", params) async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """Sync creatives.""" return await self._call_a2a_tool("sync_creatives", params) async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """List creatives.""" return await self._call_a2a_tool("list_creatives", params) async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]: """Get media buy delivery.""" return await self._call_a2a_tool("get_media_buy_delivery", params) async def list_authorized_properties(self, params: dict[str, Any]) -> TaskResult[Any]: """List authorized properties.""" return await self._call_a2a_tool("list_authorized_properties", params) async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]: """Get signals.""" return await self._call_a2a_tool("get_signals", params) async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]: """Activate signal.""" return await self._call_a2a_tool("activate_signal", params) async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]: """Provide performance feedback.""" return await self._call_a2a_tool("provide_performance_feedback", params) async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Generate preview URLs for a creative manifest.""" return await self._call_a2a_tool("preview_creative", params) async def list_tools(self) -> list[str]: """ List available tools from A2A agent. Note: A2A doesn't have a standard tools/list endpoint. Agents expose their capabilities through the agent card. For AdCP, we rely on the standard AdCP tool set. """ client = await self._get_client() headers = {"Content-Type": "application/json"} if self.agent_config.auth_token: # Support custom auth headers and types if self.agent_config.auth_type == "bearer": headers[self.agent_config.auth_header] = f"Bearer {self.agent_config.auth_token}" else: headers[self.agent_config.auth_header] = self.agent_config.auth_token # Try to fetch agent card from standard A2A location # A2A spec uses /.well-known/agent.json for agent card url = f"{self.agent_config.agent_uri}/.well-known/agent.json" logger.debug(f"Fetching A2A agent card for {self.agent_config.id} from {url}") try: response = await client.get(url, headers=headers, timeout=self.agent_config.timeout) response.raise_for_status() data = response.json() # Extract skills from agent card skills = data.get("skills", []) tool_names = [skill.get("name", "") for skill in skills if skill.get("name")] logger.info(f"Found {len(tool_names)} tools from A2A agent {self.agent_config.id}") return tool_names except httpx.HTTPStatusError as e: status_code = e.response.status_code if status_code in (401, 403): logger.error(f"Authentication failed for A2A agent {self.agent_config.id}") raise ADCPAuthenticationError( f"Authentication failed: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e else: logger.error(f"HTTP {status_code} error fetching agent card: {e}") raise ADCPConnectionError( f"Failed to fetch agent card: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e except httpx.TimeoutException as e: logger.error(f"Timeout fetching agent card for {self.agent_config.id}") raise ADCPTimeoutError( f"Timeout fetching agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, timeout=self.agent_config.timeout, ) from e except httpx.HTTPError as e: logger.error(f"HTTP error fetching agent card: {e}") raise ADCPConnectionError( f"Failed to fetch agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from eAdapter for A2A protocol following the Agent2Agent specification.
Initialize A2A adapter with reusable HTTP client.
Ancestors
- ProtocolAdapter
- abc.ABC
Methods
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close the HTTP client and clean up resources.""" if self._client is not None: logger.debug(f"Closing A2A adapter client for agent {self.agent_config.id}") await self._client.aclose() self._client = NoneClose the HTTP client and clean up resources.
async def list_tools(self) ‑> list[str]-
Expand source code
async def list_tools(self) -> list[str]: """ List available tools from A2A agent. Note: A2A doesn't have a standard tools/list endpoint. Agents expose their capabilities through the agent card. For AdCP, we rely on the standard AdCP tool set. """ client = await self._get_client() headers = {"Content-Type": "application/json"} if self.agent_config.auth_token: # Support custom auth headers and types if self.agent_config.auth_type == "bearer": headers[self.agent_config.auth_header] = f"Bearer {self.agent_config.auth_token}" else: headers[self.agent_config.auth_header] = self.agent_config.auth_token # Try to fetch agent card from standard A2A location # A2A spec uses /.well-known/agent.json for agent card url = f"{self.agent_config.agent_uri}/.well-known/agent.json" logger.debug(f"Fetching A2A agent card for {self.agent_config.id} from {url}") try: response = await client.get(url, headers=headers, timeout=self.agent_config.timeout) response.raise_for_status() data = response.json() # Extract skills from agent card skills = data.get("skills", []) tool_names = [skill.get("name", "") for skill in skills if skill.get("name")] logger.info(f"Found {len(tool_names)} tools from A2A agent {self.agent_config.id}") return tool_names except httpx.HTTPStatusError as e: status_code = e.response.status_code if status_code in (401, 403): logger.error(f"Authentication failed for A2A agent {self.agent_config.id}") raise ADCPAuthenticationError( f"Authentication failed: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e else: logger.error(f"HTTP {status_code} error fetching agent card: {e}") raise ADCPConnectionError( f"Failed to fetch agent card: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e except httpx.TimeoutException as e: logger.error(f"Timeout fetching agent card for {self.agent_config.id}") raise ADCPTimeoutError( f"Timeout fetching agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, timeout=self.agent_config.timeout, ) from e except httpx.HTTPError as e: logger.error(f"HTTP error fetching agent card: {e}") raise ADCPConnectionError( f"Failed to fetch agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from eList available tools from A2A agent.
Note: A2A doesn't have a standard tools/list endpoint. Agents expose their capabilities through the agent card. For AdCP, we rely on the standard AdCP tool set.
async def preview_creative(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Generate preview URLs for a creative manifest.""" return await self._call_a2a_tool("preview_creative", params)Generate preview URLs for a creative manifest.
Inherited members