Module adcp.protocols
Sub-modules
adcp.protocols.a2aadcp.protocols.baseadcp.protocols.mcp
Classes
class A2AAdapter (agent_config: AgentConfig)-
Expand source code
class A2AAdapter(ProtocolAdapter): """Adapter for A2A protocol using official a2a-sdk client.""" def __init__(self, agent_config: AgentConfig): """Initialize A2A adapter with official A2A client.""" super().__init__(agent_config) self._httpx_client: httpx.AsyncClient | None = None self._a2a_client: A2AClient | None = None async def _get_httpx_client(self) -> httpx.AsyncClient: """Get or create the HTTP client with connection pooling.""" if self._httpx_client is None: limits = httpx.Limits( max_keepalive_connections=10, max_connections=20, keepalive_expiry=30.0, ) headers = {} if self.agent_config.auth_token: if self.agent_config.auth_type == "bearer": headers["Authorization"] = f"Bearer {self.agent_config.auth_token}" else: headers[self.agent_config.auth_header] = self.agent_config.auth_token self._httpx_client = httpx.AsyncClient( limits=limits, headers=headers, timeout=self.agent_config.timeout, ) logger.debug( f"Created HTTP client with connection pooling for agent {self.agent_config.id}" ) return self._httpx_client async def _get_a2a_client(self) -> A2AClient: """Get or create the A2A client.""" if self._a2a_client is None: httpx_client = await self._get_httpx_client() # Use A2ACardResolver to fetch the agent card card_resolver = A2ACardResolver( httpx_client=httpx_client, base_url=self.agent_config.agent_uri, ) try: agent_card = await card_resolver.get_agent_card() logger.debug(f"Fetched agent card for {self.agent_config.id}") except httpx.HTTPStatusError as e: status_code = e.response.status_code if status_code in (401, 403): raise ADCPAuthenticationError( f"Authentication failed: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e else: raise ADCPConnectionError( f"Failed to fetch agent card: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e except httpx.TimeoutException as e: raise ADCPTimeoutError( f"Timeout fetching agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, timeout=self.agent_config.timeout, ) from e except httpx.HTTPError as e: raise ADCPConnectionError( f"Failed to fetch agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e self._a2a_client = A2AClient( httpx_client=httpx_client, agent_card=agent_card, ) logger.debug(f"Created A2A client for agent {self.agent_config.id}") return self._a2a_client async def close(self) -> None: """Close the HTTP client and clean up resources.""" if self._httpx_client is not None: logger.debug(f"Closing A2A adapter client for agent {self.agent_config.id}") await self._httpx_client.aclose() self._httpx_client = None self._a2a_client = None async def _call_a2a_tool( self, tool_name: str, params: dict[str, Any], use_explicit_skill: bool = True ) -> TaskResult[Any]: """ Call a tool using A2A protocol via official a2a-sdk client. Args: tool_name: Name of the skill/tool to invoke params: Parameters to pass to the skill use_explicit_skill: If True, use explicit skill invocation (deterministic). If False, use natural language (flexible). The default is explicit skill invocation for predictable, repeatable behavior. See: https://docs.adcontextprotocol.org/docs/protocols/a2a-guide """ start_time = time.time() if self.agent_config.debug else None a2a_client = await self._get_a2a_client() # Build A2A message message_id = str(uuid4()) if use_explicit_skill: # Explicit skill invocation (deterministic) # Use DataPart with skill name and parameters data_part = DataPart( data={ "skill": tool_name, "parameters": params, } ) message = Message( message_id=message_id, role=Role.user, parts=[Part(root=data_part)], ) else: # Natural language invocation (flexible) # Agent interprets intent from text text_part = TextPart(text=self._format_tool_request(tool_name, params)) message = Message( message_id=message_id, role=Role.user, parts=[Part(root=text_part)], ) # Build request params params_obj = MessageSendParams(message=message) # Build request request = SendMessageRequest( id=str(uuid4()), params=params_obj, ) debug_info = None debug_request: dict[str, Any] = {} if self.agent_config.debug: debug_request = { "method": "send_message", "message_id": message_id, "tool": tool_name, "params": params, } try: # Use official A2A client sdk_response = await a2a_client.send_message(request) # SendMessageResponse is a RootModel union - unwrap it to get the actual response # (either JSONRPCSuccessResponse or JSONRPCErrorResponse) response = sdk_response.root if hasattr(sdk_response, "root") else sdk_response # Handle JSON-RPC error response if hasattr(response, "error"): error_msg = response.error.message if response.error.message else "Unknown error" if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request, response={"error": response.error.model_dump()}, duration_ms=duration_ms, ) return TaskResult[Any]( status=TaskStatus.FAILED, error=error_msg, success=False, debug_info=debug_info, ) # Handle success response if hasattr(response, "result"): result = response.result if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request, response={"result": result.model_dump()}, duration_ms=duration_ms, ) # Result can be either Task or Message if isinstance(result, Task): return self._process_task_response(result, debug_info) else: # Message response (shouldn't happen for send_message, but handle it) agent_id = self.agent_config.id logger.warning(f"Received Message instead of Task from A2A agent {agent_id}") return TaskResult[Any]( status=TaskStatus.COMPLETED, data=None, message="Received message response", success=True, debug_info=debug_info, ) # Shouldn't reach here return TaskResult[Any]( status=TaskStatus.FAILED, error="Invalid response from A2A client", success=False, debug_info=debug_info, ) except httpx.HTTPStatusError as e: status_code = e.response.status_code if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request, response={"error": str(e), "status_code": status_code}, duration_ms=duration_ms, ) if status_code in (401, 403): error_msg = f"Authentication failed: HTTP {status_code}" else: error_msg = f"HTTP {status_code} error: {e}" return TaskResult[Any]( status=TaskStatus.FAILED, error=error_msg, success=False, debug_info=debug_info, ) except httpx.TimeoutException as e: if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request, response={"error": str(e)}, duration_ms=duration_ms, ) return TaskResult[Any]( status=TaskStatus.FAILED, error=f"Timeout: {e}", success=False, debug_info=debug_info, ) except Exception as e: if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request, response={"error": str(e)}, duration_ms=duration_ms, ) return TaskResult[Any]( status=TaskStatus.FAILED, error=str(e), success=False, debug_info=debug_info, ) def _process_task_response(self, task: Task, debug_info: DebugInfo | None) -> TaskResult[Any]: """Process a Task response from A2A into our TaskResult format.""" task_state = task.status.state if task_state == "completed": # Extract the result from the artifacts array result_data = self._extract_result_from_task(task) # Check for task-level errors in the payload errors = result_data.get("errors", []) if isinstance(result_data, dict) else [] has_errors = bool(errors) return TaskResult[Any]( status=TaskStatus.COMPLETED, data=result_data, message=self._extract_text_from_task(task), success=not has_errors, metadata={ "task_id": task.id, "context_id": task.context_id, }, debug_info=debug_info, ) elif task_state == "failed": # Protocol-level failure - extract error message from TextPart error_msg = self._extract_text_from_task(task) or "Task failed" return TaskResult[Any]( status=TaskStatus.FAILED, error=error_msg, success=False, debug_info=debug_info, ) else: # Handle all interim states (submitted, working, input-required, etc.) return TaskResult[Any]( status=TaskStatus.SUBMITTED, data=None, # Interim responses may not have structured AdCP content message=self._extract_text_from_task(task), success=True, metadata={ "task_id": task.id, "context_id": task.context_id, "status": task_state, }, debug_info=debug_info, ) def _format_tool_request(self, tool_name: str, params: dict[str, Any]) -> str: """Format tool request as natural language for A2A.""" import json return f"Execute tool: {tool_name}\nParameters: {json.dumps(params, indent=2)}" def _extract_result_from_task(self, task: Task) -> Any: """ Extract result data from A2A Task following canonical format. Per A2A response spec: - Responses MUST include at least one DataPart (kind: "data") - When multiple DataParts exist in an artifact, the last one is authoritative - When multiple artifacts exist, use the last one (most recent in streaming) - DataParts contain structured AdCP payload """ if not task.artifacts: logger.warning("A2A Task missing required artifacts array") return {} # Use last artifact (most recent in streaming scenarios) target_artifact = task.artifacts[-1] if not target_artifact.parts: logger.warning("A2A Task artifact has no parts") return {} # Find all DataParts (kind: "data") # Note: Parts are wrapped in a Part union type, access via .root from a2a.types import DataPart data_parts = [p.root for p in target_artifact.parts if isinstance(p.root, DataPart)] if not data_parts: logger.warning("A2A Task missing required DataPart (kind: 'data')") return {} # Use last DataPart as authoritative (handles streaming scenarios within an artifact) last_data_part = data_parts[-1] data = last_data_part.data # Some A2A implementations (e.g., ADK) wrap the response in {"response": {...}} # Unwrap it to get the actual AdCP payload if present if isinstance(data, dict) and "response" in data: # If response is the only key, unwrap completely if len(data) == 1: return data["response"] # If there are other keys alongside response, prefer the wrapped content return data["response"] return data def _extract_text_from_task(self, task: Task) -> str | None: """Extract human-readable message from TextPart if present.""" if not task.artifacts: return None # Use last artifact (most recent in streaming scenarios) target_artifact = task.artifacts[-1] # Find TextPart (kind: "text") # Note: Parts are wrapped in a Part union type, access via .root for part in target_artifact.parts: if isinstance(part.root, TextPart): return part.root.text return None # ======================================================================== # ADCP Protocol Methods # ======================================================================== async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]: """Get advertising products.""" return await self._call_a2a_tool("get_products", params) async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]: """List supported creative formats.""" return await self._call_a2a_tool("list_creative_formats", params) async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """Sync creatives.""" return await self._call_a2a_tool("sync_creatives", params) async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """List creatives.""" return await self._call_a2a_tool("list_creatives", params) async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]: """Get media buy delivery.""" return await self._call_a2a_tool("get_media_buy_delivery", params) async def list_authorized_properties(self, params: dict[str, Any]) -> TaskResult[Any]: """List authorized properties.""" return await self._call_a2a_tool("list_authorized_properties", params) async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]: """Get signals.""" return await self._call_a2a_tool("get_signals", params) async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]: """Activate signal.""" return await self._call_a2a_tool("activate_signal", params) async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]: """Provide performance feedback.""" return await self._call_a2a_tool("provide_performance_feedback", params) async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Generate preview URLs for a creative manifest.""" return await self._call_a2a_tool("preview_creative", params) async def create_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]: """Create media buy.""" return await self._call_a2a_tool("create_media_buy", params) async def update_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]: """Update media buy.""" return await self._call_a2a_tool("update_media_buy", params) async def build_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Build creative.""" return await self._call_a2a_tool("build_creative", params) async def list_tools(self) -> list[str]: """ List available tools from A2A agent. Uses A2A client which already fetched the agent card during initialization. """ # Get the A2A client (which already fetched the agent card) a2a_client = await self._get_a2a_client() # Fetch the agent card using the official method try: agent_card = await a2a_client.get_card() # Extract skills from agent card tool_names = [skill.name for skill in agent_card.skills if skill.name] logger.info(f"Found {len(tool_names)} tools from A2A agent {self.agent_config.id}") return tool_names except httpx.HTTPStatusError as e: status_code = e.response.status_code if status_code in (401, 403): logger.error(f"Authentication failed for A2A agent {self.agent_config.id}") raise ADCPAuthenticationError( f"Authentication failed: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e else: logger.error(f"HTTP {status_code} error fetching agent card: {e}") raise ADCPConnectionError( f"Failed to fetch agent card: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e except httpx.TimeoutException as e: logger.error(f"Timeout fetching agent card for {self.agent_config.id}") raise ADCPTimeoutError( f"Timeout fetching agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, timeout=self.agent_config.timeout, ) from e except httpx.HTTPError as e: logger.error(f"HTTP error fetching agent card: {e}") raise ADCPConnectionError( f"Failed to fetch agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e async def get_agent_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata from A2A agent card. Uses A2A client's get_card() method to fetch the agent card and extracts: - Basic agent info (name, description, version) - AdCP extension (extensions.adcp.adcp_version, extensions.adcp.protocols_supported) - Available skills/tools Returns: Dictionary with agent metadata """ # Get the A2A client (which already fetched the agent card) a2a_client = await self._get_a2a_client() logger.debug(f"Fetching A2A agent info for {self.agent_config.id}") try: agent_card = await a2a_client.get_card() # Extract basic info info: dict[str, Any] = { "name": agent_card.name, "description": agent_card.description, "version": agent_card.version, "protocol": "a2a", } # Extract skills/tools tool_names = [skill.name for skill in agent_card.skills if skill.name] if tool_names: info["tools"] = tool_names # Extract AdCP extension metadata # Note: AgentCard type doesn't include extensions in the SDK, # but it may be present at runtime extensions = getattr(agent_card, "extensions", None) if extensions: adcp_ext = extensions.get("adcp") if adcp_ext: info["adcp_version"] = adcp_ext.get("adcp_version") info["protocols_supported"] = adcp_ext.get("protocols_supported") logger.info(f"Retrieved agent info for {self.agent_config.id}") return info except httpx.HTTPStatusError as e: status_code = e.response.status_code if status_code in (401, 403): raise ADCPAuthenticationError( f"Authentication failed: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e else: raise ADCPConnectionError( f"Failed to fetch agent card: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e except httpx.TimeoutException as e: raise ADCPTimeoutError( f"Timeout fetching agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, timeout=self.agent_config.timeout, ) from e except httpx.HTTPError as e: raise ADCPConnectionError( f"Failed to fetch agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from eAdapter for A2A protocol using official a2a-sdk client.
Initialize A2A adapter with official A2A client.
Ancestors
- ProtocolAdapter
- abc.ABC
Methods
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close the HTTP client and clean up resources.""" if self._httpx_client is not None: logger.debug(f"Closing A2A adapter client for agent {self.agent_config.id}") await self._httpx_client.aclose() self._httpx_client = None self._a2a_client = NoneClose the HTTP client and clean up resources.
async def get_agent_info(self) ‑> dict[str, typing.Any]-
Expand source code
async def get_agent_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata from A2A agent card. Uses A2A client's get_card() method to fetch the agent card and extracts: - Basic agent info (name, description, version) - AdCP extension (extensions.adcp.adcp_version, extensions.adcp.protocols_supported) - Available skills/tools Returns: Dictionary with agent metadata """ # Get the A2A client (which already fetched the agent card) a2a_client = await self._get_a2a_client() logger.debug(f"Fetching A2A agent info for {self.agent_config.id}") try: agent_card = await a2a_client.get_card() # Extract basic info info: dict[str, Any] = { "name": agent_card.name, "description": agent_card.description, "version": agent_card.version, "protocol": "a2a", } # Extract skills/tools tool_names = [skill.name for skill in agent_card.skills if skill.name] if tool_names: info["tools"] = tool_names # Extract AdCP extension metadata # Note: AgentCard type doesn't include extensions in the SDK, # but it may be present at runtime extensions = getattr(agent_card, "extensions", None) if extensions: adcp_ext = extensions.get("adcp") if adcp_ext: info["adcp_version"] = adcp_ext.get("adcp_version") info["protocols_supported"] = adcp_ext.get("protocols_supported") logger.info(f"Retrieved agent info for {self.agent_config.id}") return info except httpx.HTTPStatusError as e: status_code = e.response.status_code if status_code in (401, 403): raise ADCPAuthenticationError( f"Authentication failed: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e else: raise ADCPConnectionError( f"Failed to fetch agent card: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e except httpx.TimeoutException as e: raise ADCPTimeoutError( f"Timeout fetching agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, timeout=self.agent_config.timeout, ) from e except httpx.HTTPError as e: raise ADCPConnectionError( f"Failed to fetch agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from eGet agent information including AdCP extension metadata from A2A agent card.
Uses A2A client's get_card() method to fetch the agent card and extracts: - Basic agent info (name, description, version) - AdCP extension (extensions.adcp.adcp_version, extensions.adcp.protocols_supported) - Available skills/tools
Returns
Dictionary with agent metadata
async def list_tools(self) ‑> list[str]-
Expand source code
async def list_tools(self) -> list[str]: """ List available tools from A2A agent. Uses A2A client which already fetched the agent card during initialization. """ # Get the A2A client (which already fetched the agent card) a2a_client = await self._get_a2a_client() # Fetch the agent card using the official method try: agent_card = await a2a_client.get_card() # Extract skills from agent card tool_names = [skill.name for skill in agent_card.skills if skill.name] logger.info(f"Found {len(tool_names)} tools from A2A agent {self.agent_config.id}") return tool_names except httpx.HTTPStatusError as e: status_code = e.response.status_code if status_code in (401, 403): logger.error(f"Authentication failed for A2A agent {self.agent_config.id}") raise ADCPAuthenticationError( f"Authentication failed: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e else: logger.error(f"HTTP {status_code} error fetching agent card: {e}") raise ADCPConnectionError( f"Failed to fetch agent card: HTTP {status_code}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from e except httpx.TimeoutException as e: logger.error(f"Timeout fetching agent card for {self.agent_config.id}") raise ADCPTimeoutError( f"Timeout fetching agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, timeout=self.agent_config.timeout, ) from e except httpx.HTTPError as e: logger.error(f"HTTP error fetching agent card: {e}") raise ADCPConnectionError( f"Failed to fetch agent card: {e}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from eList available tools from A2A agent.
Uses A2A client which already fetched the agent card during initialization.
async def preview_creative(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Generate preview URLs for a creative manifest.""" return await self._call_a2a_tool("preview_creative", params)Generate preview URLs for a creative manifest.
Inherited members
class MCPAdapter (*args: Any, **kwargs: Any)-
Expand source code
class MCPAdapter(ProtocolAdapter): """Adapter for MCP protocol using official Python MCP SDK.""" def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) if not MCP_AVAILABLE: raise ImportError( "MCP SDK not installed. Install with: pip install mcp (requires Python 3.10+)" ) self._session: Any = None self._exit_stack: Any = None async def _cleanup_failed_connection(self, context: str) -> None: """ Clean up resources after a failed connection attempt. This method handles cleanup without raising exceptions to avoid masking the original connection error. Args: context: Description of the context for logging (e.g., "during connection attempt") """ if self._exit_stack is not None: old_stack = self._exit_stack self._exit_stack = None self._session = None try: await old_stack.aclose() except BaseException as cleanup_error: # Handle all cleanup errors including ExceptionGroup # Re-raise KeyboardInterrupt and SystemExit immediately if isinstance(cleanup_error, (KeyboardInterrupt, SystemExit)): raise if isinstance(cleanup_error, asyncio.CancelledError): logger.debug(f"MCP session cleanup cancelled {context}") return # Handle ExceptionGroup/BaseExceptionGroup from task group failures (Python 3.11+) # ExceptionGroup: for Exception subclasses (e.g., HTTPStatusError) # BaseExceptionGroup: for BaseException subclasses (e.g., CancelledError) # We need both because CancelledError is a BaseException, not an Exception is_exception_group = ( _ExceptionGroup is not None and isinstance(cleanup_error, _ExceptionGroup) ) or ( _BaseExceptionGroup is not None and isinstance(cleanup_error, _BaseExceptionGroup) ) if is_exception_group: # Check if all exceptions in the group are CancelledError # If so, treat the entire group as a cancellation all_cancelled = all( isinstance(exc, asyncio.CancelledError) for exc in cleanup_error.exceptions # type: ignore[attr-defined] ) if all_cancelled: logger.debug(f"MCP session cleanup cancelled {context}") return # Mixed group: skip CancelledErrors and log real errors exceptions = cleanup_error.exceptions # type: ignore[attr-defined] cancelled_errors = [ exc for exc in exceptions if isinstance(exc, asyncio.CancelledError) ] cancelled_count = len(cancelled_errors) if cancelled_count > 0: logger.debug( f"Skipping {cancelled_count} CancelledError(s) " f"in mixed exception group {context}" ) # Log each non-cancelled exception individually for exc in exceptions: if not isinstance(exc, asyncio.CancelledError): self._log_cleanup_error(exc, context) else: self._log_cleanup_error(cleanup_error, context) def _log_cleanup_error(self, exc: BaseException, context: str) -> None: """Log a cleanup error without raising.""" # Check for known cleanup error patterns from httpx/anyio exc_str = str(exc).lower() # Common cleanup errors that are expected when connection fails is_known_cleanup_error = ( isinstance(exc, RuntimeError) and ("cancel scope" in exc_str or "async context" in exc_str) ) or ( # HTTP errors during cleanup (if httpx is available) HTTPX_AVAILABLE and HTTPStatusError is not None and isinstance(exc, HTTPStatusError) ) if is_known_cleanup_error: # Expected cleanup errors - log at debug level without stack trace logger.debug(f"Ignoring expected cleanup error {context}: {exc}") else: # Truly unexpected cleanup errors - log at warning with full context logger.warning(f"Unexpected error during cleanup {context}: {exc}", exc_info=True) async def _get_session(self) -> ClientSession: """ Get or create MCP client session with URL fallback handling. Raises: ADCPConnectionError: If connection to agent fails """ if self._session is not None: return self._session # type: ignore[no-any-return] logger.debug(f"Creating MCP session for agent {self.agent_config.id}") # Parse the agent URI to determine transport type parsed = urlparse(self.agent_config.agent_uri) # Use SSE transport for HTTP/HTTPS endpoints if parsed.scheme in ("http", "https"): self._exit_stack = AsyncExitStack() # Create SSE client with authentication header headers = {} if self.agent_config.auth_token: # Support custom auth headers and types if self.agent_config.auth_type == "bearer": headers[self.agent_config.auth_header] = ( f"Bearer {self.agent_config.auth_token}" ) else: headers[self.agent_config.auth_header] = self.agent_config.auth_token # Try the user's exact URL first urls_to_try = [self.agent_config.agent_uri] # If URL doesn't end with /mcp, also try with /mcp suffix if not self.agent_config.agent_uri.rstrip("/").endswith("/mcp"): base_uri = self.agent_config.agent_uri.rstrip("/") urls_to_try.append(f"{base_uri}/mcp") last_error = None for url in urls_to_try: try: # Choose transport based on configuration if self.agent_config.mcp_transport == "streamable_http": # Use streamable HTTP transport (newer, bidirectional) read, write, _get_session_id = await self._exit_stack.enter_async_context( streamablehttp_client( url, headers=headers, timeout=self.agent_config.timeout ) ) else: # Use SSE transport (legacy, but widely supported) read, write = await self._exit_stack.enter_async_context( sse_client(url, headers=headers) ) self._session = await self._exit_stack.enter_async_context( _ClientSession(read, write) ) # Initialize the session await self._session.initialize() logger.info( f"Connected to MCP agent {self.agent_config.id} at {url} " f"using {self.agent_config.mcp_transport} transport" ) if url != self.agent_config.agent_uri: logger.info( f"Note: Connected using fallback URL {url} " f"(configured: {self.agent_config.agent_uri})" ) return self._session # type: ignore[no-any-return] except BaseException as e: # Catch BaseException to handle CancelledError from failed initialization # Re-raise KeyboardInterrupt and SystemExit immediately if isinstance(e, (KeyboardInterrupt, SystemExit)): raise last_error = e # Clean up the exit stack on failure to avoid resource leaks await self._cleanup_failed_connection("during connection attempt") # If this isn't the last URL to try, create a new exit stack and continue if url != urls_to_try[-1]: logger.debug(f"Retrying with next URL after error: {last_error}") self._exit_stack = AsyncExitStack() continue # If this was the last URL, raise the error logger.error( f"Failed to connect to MCP agent {self.agent_config.id} using " f"{self.agent_config.mcp_transport} transport. " f"Tried URLs: {', '.join(urls_to_try)}" ) # Classify error type for better exception handling error_str = str(last_error).lower() if "401" in error_str or "403" in error_str or "unauthorized" in error_str: from adcp.exceptions import ADCPAuthenticationError raise ADCPAuthenticationError( f"Authentication failed: {last_error}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from last_error elif "timeout" in error_str: raise ADCPTimeoutError( f"Connection timeout: {last_error}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, timeout=self.agent_config.timeout, ) from last_error else: raise ADCPConnectionError( f"Failed to connect: {last_error}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) from last_error # This shouldn't be reached, but just in case raise RuntimeError(f"Failed to connect to MCP agent at {self.agent_config.agent_uri}") else: raise ValueError(f"Unsupported transport scheme: {parsed.scheme}") def _serialize_mcp_content(self, content: list[Any]) -> list[dict[str, Any]]: """ Convert MCP SDK content objects to plain dicts. The MCP SDK returns Pydantic objects (TextContent, ImageContent, etc.) but the rest of the ADCP client expects protocol-agnostic dicts. This method handles the translation at the protocol boundary. Args: content: List of MCP content items (may be dicts or Pydantic objects) Returns: List of plain dicts representing the content """ result = [] for item in content: # Already a dict, pass through if isinstance(item, dict): result.append(item) # Pydantic v2 model with model_dump() elif hasattr(item, "model_dump"): result.append(item.model_dump()) # Pydantic v1 model with dict() elif hasattr(item, "dict") and callable(item.dict): result.append(item.dict()) # Fallback: try to access __dict__ elif hasattr(item, "__dict__"): result.append(dict(item.__dict__)) # Last resort: serialize as unknown type else: logger.warning(f"Unknown MCP content type: {type(item)}, serializing as string") result.append({"type": "unknown", "data": str(item)}) return result async def _call_mcp_tool(self, tool_name: str, params: dict[str, Any]) -> TaskResult[Any]: """Call a tool using MCP protocol.""" start_time = time.time() if self.agent_config.debug else None debug_info = None try: session = await self._get_session() if self.agent_config.debug: debug_request = { "protocol": "MCP", "tool": tool_name, "params": params, "transport": self.agent_config.mcp_transport, } # Call the tool using MCP client session result = await session.call_tool(tool_name, params) # Check if this is an error response is_error = hasattr(result, "isError") and result.isError # Extract human-readable message from content message_text = None if hasattr(result, "content") and result.content: serialized_content = self._serialize_mcp_content(result.content) if isinstance(serialized_content, list): for item in serialized_content: is_text = isinstance(item, dict) and item.get("type") == "text" if is_text and item.get("text"): message_text = item["text"] break # Handle error responses if is_error: # For error responses, structuredContent is optional # Use the error message from content as the error error_message = message_text or "Tool execution failed" if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request, response={ "error": error_message, "is_error": True, }, duration_ms=duration_ms, ) return TaskResult[Any]( status=TaskStatus.FAILED, error=error_message, success=False, debug_info=debug_info, ) # For successful responses, structuredContent is required if not hasattr(result, "structuredContent") or result.structuredContent is None: raise ValueError( f"MCP tool {tool_name} did not return structuredContent. " f"This SDK requires MCP tools to provide structured responses " f"for successful calls. " f"Got content: {result.content if hasattr(result, 'content') else 'none'}" ) # Extract the structured data (required for success) data_to_return = result.structuredContent if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request, response={ "data": data_to_return, "message": message_text, "is_error": False, }, duration_ms=duration_ms, ) # Return both the structured data and the human-readable message return TaskResult[Any]( status=TaskStatus.COMPLETED, data=data_to_return, message=message_text, success=True, debug_info=debug_info, ) except Exception as e: if self.agent_config.debug and start_time: duration_ms = (time.time() - start_time) * 1000 debug_info = DebugInfo( request=debug_request if self.agent_config.debug else {}, response={"error": str(e)}, duration_ms=duration_ms, ) return TaskResult[Any]( status=TaskStatus.FAILED, error=str(e), success=False, debug_info=debug_info, ) # ======================================================================== # ADCP Protocol Methods # ======================================================================== async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]: """Get advertising products.""" return await self._call_mcp_tool("get_products", params) async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]: """List supported creative formats.""" return await self._call_mcp_tool("list_creative_formats", params) async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """Sync creatives.""" return await self._call_mcp_tool("sync_creatives", params) async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """List creatives.""" return await self._call_mcp_tool("list_creatives", params) async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]: """Get media buy delivery.""" return await self._call_mcp_tool("get_media_buy_delivery", params) async def list_authorized_properties(self, params: dict[str, Any]) -> TaskResult[Any]: """List authorized properties.""" return await self._call_mcp_tool("list_authorized_properties", params) async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]: """Get signals.""" return await self._call_mcp_tool("get_signals", params) async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]: """Activate signal.""" return await self._call_mcp_tool("activate_signal", params) async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]: """Provide performance feedback.""" return await self._call_mcp_tool("provide_performance_feedback", params) async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Generate preview URLs for a creative manifest.""" return await self._call_mcp_tool("preview_creative", params) async def create_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]: """Create media buy.""" return await self._call_mcp_tool("create_media_buy", params) async def update_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]: """Update media buy.""" return await self._call_mcp_tool("update_media_buy", params) async def build_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Build creative.""" return await self._call_mcp_tool("build_creative", params) async def list_tools(self) -> list[str]: """List available tools from MCP agent.""" session = await self._get_session() result = await session.list_tools() return [tool.name for tool in result.tools] async def get_agent_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata from MCP server. MCP servers may expose metadata through: - Server capabilities exposed during initialization - extensions.adcp in server info (if supported) - Tool list Returns: Dictionary with agent metadata """ session = await self._get_session() # Extract basic MCP server info info: dict[str, Any] = { "name": getattr(session, "server_name", None), "version": getattr(session, "server_version", None), "protocol": "mcp", } # Get available tools try: tools_result = await session.list_tools() tool_names = [tool.name for tool in tools_result.tools] if tool_names: info["tools"] = tool_names except Exception as e: logger.warning(f"Failed to list tools for {self.agent_config.id}: {e}") # Try to extract AdCP extension metadata from server capabilities # MCP servers may expose this in their initialization response if hasattr(session, "_server_capabilities"): capabilities = session._server_capabilities if isinstance(capabilities, dict): extensions = capabilities.get("extensions", {}) adcp_ext = extensions.get("adcp", {}) if adcp_ext: info["adcp_version"] = adcp_ext.get("adcp_version") info["protocols_supported"] = adcp_ext.get("protocols_supported") logger.info(f"Retrieved agent info for {self.agent_config.id}") return info async def close(self) -> None: """Close the MCP session and clean up resources.""" await self._cleanup_failed_connection("during close")Adapter for MCP protocol using official Python MCP SDK.
Initialize adapter with agent configuration.
Ancestors
- ProtocolAdapter
- abc.ABC
Methods
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close the MCP session and clean up resources.""" await self._cleanup_failed_connection("during close")Close the MCP session and clean up resources.
async def get_agent_info(self) ‑> dict[str, typing.Any]-
Expand source code
async def get_agent_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata from MCP server. MCP servers may expose metadata through: - Server capabilities exposed during initialization - extensions.adcp in server info (if supported) - Tool list Returns: Dictionary with agent metadata """ session = await self._get_session() # Extract basic MCP server info info: dict[str, Any] = { "name": getattr(session, "server_name", None), "version": getattr(session, "server_version", None), "protocol": "mcp", } # Get available tools try: tools_result = await session.list_tools() tool_names = [tool.name for tool in tools_result.tools] if tool_names: info["tools"] = tool_names except Exception as e: logger.warning(f"Failed to list tools for {self.agent_config.id}: {e}") # Try to extract AdCP extension metadata from server capabilities # MCP servers may expose this in their initialization response if hasattr(session, "_server_capabilities"): capabilities = session._server_capabilities if isinstance(capabilities, dict): extensions = capabilities.get("extensions", {}) adcp_ext = extensions.get("adcp", {}) if adcp_ext: info["adcp_version"] = adcp_ext.get("adcp_version") info["protocols_supported"] = adcp_ext.get("protocols_supported") logger.info(f"Retrieved agent info for {self.agent_config.id}") return infoGet agent information including AdCP extension metadata from MCP server.
MCP servers may expose metadata through: - Server capabilities exposed during initialization - extensions.adcp in server info (if supported) - Tool list
Returns
Dictionary with agent metadata
async def list_tools(self) ‑> list[str]-
Expand source code
async def list_tools(self) -> list[str]: """List available tools from MCP agent.""" session = await self._get_session() result = await session.list_tools() return [tool.name for tool in result.tools]List available tools from MCP agent.
async def preview_creative(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Generate preview URLs for a creative manifest.""" return await self._call_mcp_tool("preview_creative", params)Generate preview URLs for a creative manifest.
Inherited members
class ProtocolAdapter (agent_config: AgentConfig)-
Expand source code
class ProtocolAdapter(ABC): """ Base class for protocol adapters. Each adapter implements the ADCP protocol methods and handles protocol-specific translation (MCP/A2A) while returning properly typed responses. """ def __init__(self, agent_config: AgentConfig): """Initialize adapter with agent configuration.""" self.agent_config = agent_config # ======================================================================== # Helper methods for response parsing # ======================================================================== def _parse_response( self, raw_result: TaskResult[Any], response_type: type[T] | Any ) -> TaskResult[T]: """ Parse raw TaskResult into typed TaskResult. Handles both MCP content arrays and A2A dict responses. Supports both single types and Union types (for oneOf discriminated unions). Args: raw_result: Raw TaskResult from adapter response_type: Expected Pydantic response type (can be a Union type) Returns: Typed TaskResult """ # Handle failed results or interim states without data # For A2A: interim states (submitted/working) have data=None but success=True # For MCP: completed tasks always have data, missing data indicates failure if not raw_result.success or raw_result.data is None: # If already marked as unsuccessful, preserve that # If successful but no data (A2A interim state), preserve success=True return TaskResult[T]( status=raw_result.status, data=None, message=raw_result.message, success=raw_result.success, # Preserve original success state error=raw_result.error, # Only use error if one was set metadata=raw_result.metadata, debug_info=raw_result.debug_info, ) try: # Handle MCP content arrays if isinstance(raw_result.data, list): parsed_data = parse_mcp_content(raw_result.data, response_type) else: # Handle A2A or direct responses parsed_data = parse_json_or_text(raw_result.data, response_type) return TaskResult[T]( status=raw_result.status, data=parsed_data, message=raw_result.message, # Preserve human-readable message from protocol success=raw_result.success, error=raw_result.error, metadata=raw_result.metadata, debug_info=raw_result.debug_info, ) except ValueError as e: # Parsing failed - return error result return TaskResult[T]( status=TaskStatus.FAILED, error=f"Failed to parse response: {e}", message=raw_result.message, success=False, debug_info=raw_result.debug_info, ) # ======================================================================== # ADCP Protocol Methods - Type-safe, spec-aligned interface # Each adapter MUST implement these methods explicitly. # ======================================================================== @abstractmethod async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]: """Get advertising products.""" pass @abstractmethod async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]: """List supported creative formats.""" pass @abstractmethod async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """Sync creatives.""" pass @abstractmethod async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """List creatives.""" pass @abstractmethod async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]: """Get media buy delivery.""" pass @abstractmethod async def list_authorized_properties(self, params: dict[str, Any]) -> TaskResult[Any]: """List authorized properties.""" pass @abstractmethod async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]: """Get signals.""" pass @abstractmethod async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]: """Activate signal.""" pass @abstractmethod async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]: """Provide performance feedback.""" pass @abstractmethod async def create_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]: """Create media buy.""" pass @abstractmethod async def update_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]: """Update media buy.""" pass @abstractmethod async def build_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Build creative.""" pass @abstractmethod async def list_tools(self) -> list[str]: """ List available tools from the agent. Returns: List of tool names """ pass @abstractmethod async def get_agent_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata. Returns agent card information including: - Agent name, description, version - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills Returns: Dictionary with agent metadata including AdCP extension fields """ pass @abstractmethod async def close(self) -> None: """ Close the adapter and clean up resources. Implementations should close any open connections, clients, or other resources. """ passBase class for protocol adapters.
Each adapter implements the ADCP protocol methods and handles protocol-specific translation (MCP/A2A) while returning properly typed responses.
Initialize adapter with agent configuration.
Ancestors
- abc.ABC
Subclasses
Methods
async def activate_signal(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]: """Activate signal.""" passActivate signal.
async def build_creative(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def build_creative(self, params: dict[str, Any]) -> TaskResult[Any]: """Build creative.""" passBuild creative.
async def close(self) ‑> None-
Expand source code
@abstractmethod async def close(self) -> None: """ Close the adapter and clean up resources. Implementations should close any open connections, clients, or other resources. """ passClose the adapter and clean up resources.
Implementations should close any open connections, clients, or other resources.
async def create_media_buy(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def create_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]: """Create media buy.""" passCreate media buy.
async def get_agent_info(self) ‑> dict[str, typing.Any]-
Expand source code
@abstractmethod async def get_agent_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata. Returns agent card information including: - Agent name, description, version - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills Returns: Dictionary with agent metadata including AdCP extension fields """ passGet agent information including AdCP extension metadata.
Returns agent card information including: - Agent name, description, version - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills
Returns
Dictionary with agent metadata including AdCP extension fields
async def get_media_buy_delivery(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]: """Get media buy delivery.""" passGet media buy delivery.
async def get_products(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]: """Get advertising products.""" passGet advertising products.
async def get_signals(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]: """Get signals.""" passGet signals.
-
Expand source code
@abstractmethod async def list_authorized_properties(self, params: dict[str, Any]) -> TaskResult[Any]: """List authorized properties.""" passList authorized properties.
async def list_creative_formats(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]: """List supported creative formats.""" passList supported creative formats.
async def list_creatives(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """List creatives.""" passList creatives.
async def list_tools(self) ‑> list[str]-
Expand source code
@abstractmethod async def list_tools(self) -> list[str]: """ List available tools from the agent. Returns: List of tool names """ passList available tools from the agent.
Returns
List of tool names
async def provide_performance_feedback(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]: """Provide performance feedback.""" passProvide performance feedback.
async def sync_creatives(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]: """Sync creatives.""" passSync creatives.
async def update_media_buy(self, params: dict[str, Any]) ‑> TaskResult[Any]-
Expand source code
@abstractmethod async def update_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]: """Update media buy.""" passUpdate media buy.