Module adcp.protocols

Sub-modules

adcp.protocols.a2a
adcp.protocols.base
adcp.protocols.mcp

Classes

class A2AAdapter (agent_config: AgentConfig)
Expand source code
class A2AAdapter(ProtocolAdapter):
    """Adapter for A2A protocol following the Agent2Agent specification."""

    def __init__(self, agent_config: AgentConfig):
        """Initialize A2A adapter with reusable HTTP client."""
        super().__init__(agent_config)
        self._client: httpx.AsyncClient | None = None

    async def _get_client(self) -> httpx.AsyncClient:
        """Get or create the HTTP client with connection pooling."""
        if self._client is None:
            # Configure connection pooling for better performance
            limits = httpx.Limits(
                max_keepalive_connections=10,
                max_connections=20,
                keepalive_expiry=30.0,
            )
            self._client = httpx.AsyncClient(limits=limits)
            logger.debug(
                f"Created HTTP client with connection pooling for agent {self.agent_config.id}"
            )
        return self._client

    async def close(self) -> None:
        """Close the HTTP client and clean up resources."""
        if self._client is not None:
            logger.debug(f"Closing A2A adapter client for agent {self.agent_config.id}")
            await self._client.aclose()
            self._client = None

    async def _call_a2a_tool(self, tool_name: str, params: dict[str, Any]) -> TaskResult[Any]:
        """
        Call a tool using A2A protocol.

        A2A uses a tasks/send endpoint to initiate tasks. The agent responds with
        task status and may require multiple roundtrips for completion.
        """
        start_time = time.time() if self.agent_config.debug else None
        client = await self._get_client()

        headers = {"Content-Type": "application/json"}

        if self.agent_config.auth_token:
            # Support custom auth headers and types
            if self.agent_config.auth_type == "bearer":
                headers[self.agent_config.auth_header] = f"Bearer {self.agent_config.auth_token}"
            else:
                headers[self.agent_config.auth_header] = self.agent_config.auth_token

        # Construct A2A message
        message = {
            "role": "user",
            "parts": [
                {
                    "type": "text",
                    "text": self._format_tool_request(tool_name, params),
                }
            ],
        }

        # A2A uses message/send endpoint
        url = f"{self.agent_config.agent_uri}/message/send"

        request_data = {
            "message": message,
            "context_id": str(uuid4()),
        }

        debug_info = None
        if self.agent_config.debug:
            debug_request = {
                "url": url,
                "method": "POST",
                "headers": {
                    k: (
                        v
                        if k.lower() not in ("authorization", self.agent_config.auth_header.lower())
                        else "***"
                    )
                    for k, v in headers.items()
                },
                "body": request_data,
            }

        try:
            response = await client.post(
                url,
                json=request_data,
                headers=headers,
                timeout=self.agent_config.timeout,
            )
            response.raise_for_status()

            data = response.json()

            if self.agent_config.debug and start_time:
                duration_ms = (time.time() - start_time) * 1000
                debug_info = DebugInfo(
                    request=debug_request,
                    response={"status": response.status_code, "body": data},
                    duration_ms=duration_ms,
                )

            # Parse A2A response format
            # A2A tasks have lifecycle: submitted, working, completed, failed, input-required
            task_status = data.get("task", {}).get("status")

            if task_status in ("completed", "working"):
                # Extract the result from the response message
                result_data = self._extract_result(data)

                return TaskResult[Any](
                    status=TaskStatus.COMPLETED,
                    data=result_data,
                    success=True,
                    metadata={"task_id": data.get("task", {}).get("id")},
                    debug_info=debug_info,
                )
            elif task_status == "failed":
                return TaskResult[Any](
                    status=TaskStatus.FAILED,
                    error=data.get("message", {}).get("parts", [{}])[0].get("text", "Task failed"),
                    success=False,
                    debug_info=debug_info,
                )
            else:
                # Handle other states (submitted, input-required)
                return TaskResult[Any](
                    status=TaskStatus.SUBMITTED,
                    data=data,
                    success=True,
                    metadata={"task_id": data.get("task", {}).get("id")},
                    debug_info=debug_info,
                )

        except httpx.HTTPError as e:
            if self.agent_config.debug and start_time:
                duration_ms = (time.time() - start_time) * 1000
                debug_info = DebugInfo(
                    request=debug_request,
                    response={"error": str(e)},
                    duration_ms=duration_ms,
                )
            return TaskResult[Any](
                status=TaskStatus.FAILED,
                error=str(e),
                success=False,
                debug_info=debug_info,
            )

    def _format_tool_request(self, tool_name: str, params: dict[str, Any]) -> str:
        """Format tool request as natural language for A2A."""
        # For AdCP tools, we format as a structured request
        import json

        return f"Execute tool: {tool_name}\nParameters: {json.dumps(params, indent=2)}"

    def _extract_result(self, response_data: dict[str, Any]) -> Any:
        """Extract result data from A2A response."""
        # Try to extract structured data from response
        message = response_data.get("message", {})
        parts = message.get("parts", [])

        if not parts:
            return response_data

        # Return the first part's content
        first_part = parts[0]
        if first_part.get("type") == "text":
            # Try to parse as JSON if it looks like structured data
            text = first_part.get("text", "")
            try:
                import json

                return json.loads(text)
            except (json.JSONDecodeError, ValueError):
                return text

        return first_part

    # ========================================================================
    # ADCP Protocol Methods
    # ========================================================================

    async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get advertising products."""
        return await self._call_a2a_tool("get_products", params)

    async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List supported creative formats."""
        return await self._call_a2a_tool("list_creative_formats", params)

    async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync creatives."""
        return await self._call_a2a_tool("sync_creatives", params)

    async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List creatives."""
        return await self._call_a2a_tool("list_creatives", params)

    async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get media buy delivery."""
        return await self._call_a2a_tool("get_media_buy_delivery", params)

    async def list_authorized_properties(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List authorized properties."""
        return await self._call_a2a_tool("list_authorized_properties", params)

    async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get signals."""
        return await self._call_a2a_tool("get_signals", params)

    async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Activate signal."""
        return await self._call_a2a_tool("activate_signal", params)

    async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Provide performance feedback."""
        return await self._call_a2a_tool("provide_performance_feedback", params)

    async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Generate preview URLs for a creative manifest."""
        return await self._call_a2a_tool("preview_creative", params)

    async def list_tools(self) -> list[str]:
        """
        List available tools from A2A agent.

        Note: A2A doesn't have a standard tools/list endpoint. Agents expose
        their capabilities through the agent card. For AdCP, we rely on the
        standard AdCP tool set.
        """
        client = await self._get_client()

        headers = {"Content-Type": "application/json"}

        if self.agent_config.auth_token:
            # Support custom auth headers and types
            if self.agent_config.auth_type == "bearer":
                headers[self.agent_config.auth_header] = f"Bearer {self.agent_config.auth_token}"
            else:
                headers[self.agent_config.auth_header] = self.agent_config.auth_token

        # Try to fetch agent card from standard A2A location
        # A2A spec uses /.well-known/agent.json for agent card
        url = f"{self.agent_config.agent_uri}/.well-known/agent.json"

        logger.debug(f"Fetching A2A agent card for {self.agent_config.id} from {url}")

        try:
            response = await client.get(url, headers=headers, timeout=self.agent_config.timeout)
            response.raise_for_status()

            data = response.json()

            # Extract skills from agent card
            skills = data.get("skills", [])
            tool_names = [skill.get("name", "") for skill in skills if skill.get("name")]

            logger.info(f"Found {len(tool_names)} tools from A2A agent {self.agent_config.id}")
            return tool_names

        except httpx.HTTPStatusError as e:
            status_code = e.response.status_code
            if status_code in (401, 403):
                logger.error(f"Authentication failed for A2A agent {self.agent_config.id}")
                raise ADCPAuthenticationError(
                    f"Authentication failed: HTTP {status_code}",
                    agent_id=self.agent_config.id,
                    agent_uri=self.agent_config.agent_uri,
                ) from e
            else:
                logger.error(f"HTTP {status_code} error fetching agent card: {e}")
                raise ADCPConnectionError(
                    f"Failed to fetch agent card: HTTP {status_code}",
                    agent_id=self.agent_config.id,
                    agent_uri=self.agent_config.agent_uri,
                ) from e
        except httpx.TimeoutException as e:
            logger.error(f"Timeout fetching agent card for {self.agent_config.id}")
            raise ADCPTimeoutError(
                f"Timeout fetching agent card: {e}",
                agent_id=self.agent_config.id,
                agent_uri=self.agent_config.agent_uri,
                timeout=self.agent_config.timeout,
            ) from e
        except httpx.HTTPError as e:
            logger.error(f"HTTP error fetching agent card: {e}")
            raise ADCPConnectionError(
                f"Failed to fetch agent card: {e}",
                agent_id=self.agent_config.id,
                agent_uri=self.agent_config.agent_uri,
            ) from e

Adapter for A2A protocol following the Agent2Agent specification.

Initialize A2A adapter with reusable HTTP client.

Ancestors

Methods

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close the HTTP client and clean up resources."""
    if self._client is not None:
        logger.debug(f"Closing A2A adapter client for agent {self.agent_config.id}")
        await self._client.aclose()
        self._client = None

Close the HTTP client and clean up resources.

async def list_tools(self) ‑> list[str]
Expand source code
async def list_tools(self) -> list[str]:
    """
    List available tools from A2A agent.

    Note: A2A doesn't have a standard tools/list endpoint. Agents expose
    their capabilities through the agent card. For AdCP, we rely on the
    standard AdCP tool set.
    """
    client = await self._get_client()

    headers = {"Content-Type": "application/json"}

    if self.agent_config.auth_token:
        # Support custom auth headers and types
        if self.agent_config.auth_type == "bearer":
            headers[self.agent_config.auth_header] = f"Bearer {self.agent_config.auth_token}"
        else:
            headers[self.agent_config.auth_header] = self.agent_config.auth_token

    # Try to fetch agent card from standard A2A location
    # A2A spec uses /.well-known/agent.json for agent card
    url = f"{self.agent_config.agent_uri}/.well-known/agent.json"

    logger.debug(f"Fetching A2A agent card for {self.agent_config.id} from {url}")

    try:
        response = await client.get(url, headers=headers, timeout=self.agent_config.timeout)
        response.raise_for_status()

        data = response.json()

        # Extract skills from agent card
        skills = data.get("skills", [])
        tool_names = [skill.get("name", "") for skill in skills if skill.get("name")]

        logger.info(f"Found {len(tool_names)} tools from A2A agent {self.agent_config.id}")
        return tool_names

    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code in (401, 403):
            logger.error(f"Authentication failed for A2A agent {self.agent_config.id}")
            raise ADCPAuthenticationError(
                f"Authentication failed: HTTP {status_code}",
                agent_id=self.agent_config.id,
                agent_uri=self.agent_config.agent_uri,
            ) from e
        else:
            logger.error(f"HTTP {status_code} error fetching agent card: {e}")
            raise ADCPConnectionError(
                f"Failed to fetch agent card: HTTP {status_code}",
                agent_id=self.agent_config.id,
                agent_uri=self.agent_config.agent_uri,
            ) from e
    except httpx.TimeoutException as e:
        logger.error(f"Timeout fetching agent card for {self.agent_config.id}")
        raise ADCPTimeoutError(
            f"Timeout fetching agent card: {e}",
            agent_id=self.agent_config.id,
            agent_uri=self.agent_config.agent_uri,
            timeout=self.agent_config.timeout,
        ) from e
    except httpx.HTTPError as e:
        logger.error(f"HTTP error fetching agent card: {e}")
        raise ADCPConnectionError(
            f"Failed to fetch agent card: {e}",
            agent_id=self.agent_config.id,
            agent_uri=self.agent_config.agent_uri,
        ) from e

List available tools from A2A agent.

Note: A2A doesn't have a standard tools/list endpoint. Agents expose their capabilities through the agent card. For AdCP, we rely on the standard AdCP tool set.

async def preview_creative(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]:
    """Generate preview URLs for a creative manifest."""
    return await self._call_a2a_tool("preview_creative", params)

Generate preview URLs for a creative manifest.

Inherited members

class MCPAdapter (*args: Any, **kwargs: Any)
Expand source code
class MCPAdapter(ProtocolAdapter):
    """Adapter for MCP protocol using official Python MCP SDK."""

    def __init__(self, *args: Any, **kwargs: Any):
        super().__init__(*args, **kwargs)
        if not MCP_AVAILABLE:
            raise ImportError(
                "MCP SDK not installed. Install with: pip install mcp (requires Python 3.10+)"
            )
        self._session: Any = None
        self._exit_stack: Any = None

    async def _cleanup_failed_connection(self, context: str) -> None:
        """
        Clean up resources after a failed connection attempt.

        This method handles cleanup without raising exceptions to avoid
        masking the original connection error.

        Args:
            context: Description of the context for logging (e.g., "during connection attempt")
        """
        if self._exit_stack is not None:
            old_stack = self._exit_stack
            self._exit_stack = None
            self._session = None
            try:
                await old_stack.aclose()
            except asyncio.CancelledError:
                logger.debug(f"MCP session cleanup cancelled {context}")
            except RuntimeError as cleanup_error:
                # Known anyio task group cleanup issue
                error_msg = str(cleanup_error).lower()
                if "cancel scope" in error_msg or "async context" in error_msg:
                    logger.debug(f"Ignoring anyio cleanup error {context}: {cleanup_error}")
                else:
                    logger.warning(
                        f"Unexpected RuntimeError during cleanup {context}: {cleanup_error}"
                    )
            except Exception as cleanup_error:
                # Log unexpected cleanup errors but don't raise to preserve original error
                logger.warning(
                    f"Unexpected error during cleanup {context}: {cleanup_error}", exc_info=True
                )

    async def _get_session(self) -> ClientSession:
        """
        Get or create MCP client session with URL fallback handling.

        Raises:
            ADCPConnectionError: If connection to agent fails
        """
        if self._session is not None:
            return self._session  # type: ignore[no-any-return]

        logger.debug(f"Creating MCP session for agent {self.agent_config.id}")

        # Parse the agent URI to determine transport type
        parsed = urlparse(self.agent_config.agent_uri)

        # Use SSE transport for HTTP/HTTPS endpoints
        if parsed.scheme in ("http", "https"):
            self._exit_stack = AsyncExitStack()

            # Create SSE client with authentication header
            headers = {}
            if self.agent_config.auth_token:
                # Support custom auth headers and types
                if self.agent_config.auth_type == "bearer":
                    headers[self.agent_config.auth_header] = (
                        f"Bearer {self.agent_config.auth_token}"
                    )
                else:
                    headers[self.agent_config.auth_header] = self.agent_config.auth_token

            # Try the user's exact URL first
            urls_to_try = [self.agent_config.agent_uri]

            # If URL doesn't end with /mcp, also try with /mcp suffix
            if not self.agent_config.agent_uri.rstrip("/").endswith("/mcp"):
                base_uri = self.agent_config.agent_uri.rstrip("/")
                urls_to_try.append(f"{base_uri}/mcp")

            last_error = None
            for url in urls_to_try:
                try:
                    # Choose transport based on configuration
                    if self.agent_config.mcp_transport == "streamable_http":
                        # Use streamable HTTP transport (newer, bidirectional)
                        read, write, _get_session_id = await self._exit_stack.enter_async_context(
                            streamablehttp_client(
                                url, headers=headers, timeout=self.agent_config.timeout
                            )
                        )
                    else:
                        # Use SSE transport (legacy, but widely supported)
                        read, write = await self._exit_stack.enter_async_context(
                            sse_client(url, headers=headers)
                        )

                    self._session = await self._exit_stack.enter_async_context(
                        _ClientSession(read, write)
                    )

                    # Initialize the session
                    await self._session.initialize()

                    logger.info(
                        f"Connected to MCP agent {self.agent_config.id} at {url} "
                        f"using {self.agent_config.mcp_transport} transport"
                    )
                    if url != self.agent_config.agent_uri:
                        logger.info(
                            f"Note: Connected using fallback URL {url} "
                            f"(configured: {self.agent_config.agent_uri})"
                        )

                    return self._session  # type: ignore[no-any-return]
                except Exception as e:
                    last_error = e
                    # Clean up the exit stack on failure to avoid resource leaks
                    await self._cleanup_failed_connection("during connection attempt")

                    # If this isn't the last URL to try, create a new exit stack and continue
                    if url != urls_to_try[-1]:
                        logger.debug(f"Retrying with next URL after error: {last_error}")
                        self._exit_stack = AsyncExitStack()
                        continue
                    # If this was the last URL, raise the error
                    logger.error(
                        f"Failed to connect to MCP agent {self.agent_config.id} using "
                        f"{self.agent_config.mcp_transport} transport. "
                        f"Tried URLs: {', '.join(urls_to_try)}"
                    )

                    # Classify error type for better exception handling
                    error_str = str(last_error).lower()
                    if "401" in error_str or "403" in error_str or "unauthorized" in error_str:
                        from adcp.exceptions import ADCPAuthenticationError

                        raise ADCPAuthenticationError(
                            f"Authentication failed: {last_error}",
                            agent_id=self.agent_config.id,
                            agent_uri=self.agent_config.agent_uri,
                        ) from last_error
                    elif "timeout" in error_str:
                        raise ADCPTimeoutError(
                            f"Connection timeout: {last_error}",
                            agent_id=self.agent_config.id,
                            agent_uri=self.agent_config.agent_uri,
                            timeout=self.agent_config.timeout,
                        ) from last_error
                    else:
                        raise ADCPConnectionError(
                            f"Failed to connect: {last_error}",
                            agent_id=self.agent_config.id,
                            agent_uri=self.agent_config.agent_uri,
                        ) from last_error

            # This shouldn't be reached, but just in case
            raise RuntimeError(f"Failed to connect to MCP agent at {self.agent_config.agent_uri}")
        else:
            raise ValueError(f"Unsupported transport scheme: {parsed.scheme}")

    def _serialize_mcp_content(self, content: list[Any]) -> list[dict[str, Any]]:
        """
        Convert MCP SDK content objects to plain dicts.

        The MCP SDK returns Pydantic objects (TextContent, ImageContent, etc.)
        but the rest of the ADCP client expects protocol-agnostic dicts.
        This method handles the translation at the protocol boundary.

        Args:
            content: List of MCP content items (may be dicts or Pydantic objects)

        Returns:
            List of plain dicts representing the content
        """
        result = []
        for item in content:
            # Already a dict, pass through
            if isinstance(item, dict):
                result.append(item)
            # Pydantic v2 model with model_dump()
            elif hasattr(item, "model_dump"):
                result.append(item.model_dump())
            # Pydantic v1 model with dict()
            elif hasattr(item, "dict") and callable(item.dict):
                result.append(item.dict())
            # Fallback: try to access __dict__
            elif hasattr(item, "__dict__"):
                result.append(dict(item.__dict__))
            # Last resort: serialize as unknown type
            else:
                logger.warning(f"Unknown MCP content type: {type(item)}, serializing as string")
                result.append({"type": "unknown", "data": str(item)})
        return result

    async def _call_mcp_tool(self, tool_name: str, params: dict[str, Any]) -> TaskResult[Any]:
        """Call a tool using MCP protocol."""
        start_time = time.time() if self.agent_config.debug else None
        debug_info = None

        try:
            session = await self._get_session()

            if self.agent_config.debug:
                debug_request = {
                    "protocol": "MCP",
                    "tool": tool_name,
                    "params": params,
                    "transport": self.agent_config.mcp_transport,
                }

            # Call the tool using MCP client session
            result = await session.call_tool(tool_name, params)

            # Check if this is an error response
            is_error = hasattr(result, "isError") and result.isError

            # Extract human-readable message from content
            message_text = None
            if hasattr(result, "content") and result.content:
                serialized_content = self._serialize_mcp_content(result.content)
                if isinstance(serialized_content, list):
                    for item in serialized_content:
                        is_text = isinstance(item, dict) and item.get("type") == "text"
                        if is_text and item.get("text"):
                            message_text = item["text"]
                            break

            # Handle error responses
            if is_error:
                # For error responses, structuredContent is optional
                # Use the error message from content as the error
                error_message = message_text or "Tool execution failed"
                if self.agent_config.debug and start_time:
                    duration_ms = (time.time() - start_time) * 1000
                    debug_info = DebugInfo(
                        request=debug_request,
                        response={
                            "error": error_message,
                            "is_error": True,
                        },
                        duration_ms=duration_ms,
                    )
                return TaskResult[Any](
                    status=TaskStatus.FAILED,
                    error=error_message,
                    success=False,
                    debug_info=debug_info,
                )

            # For successful responses, structuredContent is required
            if not hasattr(result, "structuredContent") or result.structuredContent is None:
                raise ValueError(
                    f"MCP tool {tool_name} did not return structuredContent. "
                    f"This SDK requires MCP tools to provide structured responses "
                    f"for successful calls. "
                    f"Got content: {result.content if hasattr(result, 'content') else 'none'}"
                )

            # Extract the structured data (required for success)
            data_to_return = result.structuredContent

            if self.agent_config.debug and start_time:
                duration_ms = (time.time() - start_time) * 1000
                debug_info = DebugInfo(
                    request=debug_request,
                    response={
                        "data": data_to_return,
                        "message": message_text,
                        "is_error": False,
                    },
                    duration_ms=duration_ms,
                )

            # Return both the structured data and the human-readable message
            return TaskResult[Any](
                status=TaskStatus.COMPLETED,
                data=data_to_return,
                message=message_text,
                success=True,
                debug_info=debug_info,
            )

        except Exception as e:
            if self.agent_config.debug and start_time:
                duration_ms = (time.time() - start_time) * 1000
                debug_info = DebugInfo(
                    request=debug_request if self.agent_config.debug else {},
                    response={"error": str(e)},
                    duration_ms=duration_ms,
                )
            return TaskResult[Any](
                status=TaskStatus.FAILED,
                error=str(e),
                success=False,
                debug_info=debug_info,
            )

    # ========================================================================
    # ADCP Protocol Methods
    # ========================================================================

    async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get advertising products."""
        return await self._call_mcp_tool("get_products", params)

    async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List supported creative formats."""
        return await self._call_mcp_tool("list_creative_formats", params)

    async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync creatives."""
        return await self._call_mcp_tool("sync_creatives", params)

    async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List creatives."""
        return await self._call_mcp_tool("list_creatives", params)

    async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get media buy delivery."""
        return await self._call_mcp_tool("get_media_buy_delivery", params)

    async def list_authorized_properties(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List authorized properties."""
        return await self._call_mcp_tool("list_authorized_properties", params)

    async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get signals."""
        return await self._call_mcp_tool("get_signals", params)

    async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Activate signal."""
        return await self._call_mcp_tool("activate_signal", params)

    async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Provide performance feedback."""
        return await self._call_mcp_tool("provide_performance_feedback", params)

    async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Generate preview URLs for a creative manifest."""
        return await self._call_mcp_tool("preview_creative", params)

    async def list_tools(self) -> list[str]:
        """List available tools from MCP agent."""
        session = await self._get_session()
        result = await session.list_tools()
        return [tool.name for tool in result.tools]

    async def close(self) -> None:
        """Close the MCP session and clean up resources."""
        await self._cleanup_failed_connection("during close")

Adapter for MCP protocol using official Python MCP SDK.

Initialize adapter with agent configuration.

Ancestors

Methods

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close the MCP session and clean up resources."""
    await self._cleanup_failed_connection("during close")

Close the MCP session and clean up resources.

async def list_tools(self) ‑> list[str]
Expand source code
async def list_tools(self) -> list[str]:
    """List available tools from MCP agent."""
    session = await self._get_session()
    result = await session.list_tools()
    return [tool.name for tool in result.tools]

List available tools from MCP agent.

async def preview_creative(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]:
    """Generate preview URLs for a creative manifest."""
    return await self._call_mcp_tool("preview_creative", params)

Generate preview URLs for a creative manifest.

Inherited members

class ProtocolAdapter (agent_config: AgentConfig)
Expand source code
class ProtocolAdapter(ABC):
    """
    Base class for protocol adapters.

    Each adapter implements the ADCP protocol methods and handles
    protocol-specific translation (MCP/A2A) while returning properly
    typed responses.
    """

    def __init__(self, agent_config: AgentConfig):
        """Initialize adapter with agent configuration."""
        self.agent_config = agent_config

    # ========================================================================
    # Helper methods for response parsing
    # ========================================================================

    def _parse_response(
        self, raw_result: TaskResult[Any], response_type: type[T] | Any
    ) -> TaskResult[T]:
        """
        Parse raw TaskResult into typed TaskResult.

        Handles both MCP content arrays and A2A dict responses.
        Supports both single types and Union types (for oneOf discriminated unions).

        Args:
            raw_result: Raw TaskResult from adapter
            response_type: Expected Pydantic response type (can be a Union type)

        Returns:
            Typed TaskResult
        """
        # Handle failed results or missing data
        if not raw_result.success or raw_result.data is None:
            # Explicitly construct typed result to satisfy type checker
            return TaskResult[T](
                status=raw_result.status,
                data=None,
                message=raw_result.message,
                success=False,
                error=raw_result.error or "No data returned from adapter",
                metadata=raw_result.metadata,
                debug_info=raw_result.debug_info,
            )

        try:
            # Handle MCP content arrays
            if isinstance(raw_result.data, list):
                parsed_data = parse_mcp_content(raw_result.data, response_type)
            else:
                # Handle A2A or direct responses
                parsed_data = parse_json_or_text(raw_result.data, response_type)

            return TaskResult[T](
                status=raw_result.status,
                data=parsed_data,
                message=raw_result.message,  # Preserve human-readable message from protocol
                success=raw_result.success,
                error=raw_result.error,
                metadata=raw_result.metadata,
                debug_info=raw_result.debug_info,
            )
        except ValueError as e:
            # Parsing failed - return error result
            return TaskResult[T](
                status=TaskStatus.FAILED,
                error=f"Failed to parse response: {e}",
                message=raw_result.message,
                success=False,
                debug_info=raw_result.debug_info,
            )

    # ========================================================================
    # ADCP Protocol Methods - Type-safe, spec-aligned interface
    # Each adapter MUST implement these methods explicitly.
    # ========================================================================

    @abstractmethod
    async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get advertising products."""
        pass

    @abstractmethod
    async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List supported creative formats."""
        pass

    @abstractmethod
    async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync creatives."""
        pass

    @abstractmethod
    async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List creatives."""
        pass

    @abstractmethod
    async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get media buy delivery."""
        pass

    @abstractmethod
    async def list_authorized_properties(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List authorized properties."""
        pass

    @abstractmethod
    async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get signals."""
        pass

    @abstractmethod
    async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Activate signal."""
        pass

    @abstractmethod
    async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Provide performance feedback."""
        pass

    @abstractmethod
    async def list_tools(self) -> list[str]:
        """
        List available tools from the agent.

        Returns:
            List of tool names
        """
        pass

    @abstractmethod
    async def close(self) -> None:
        """
        Close the adapter and clean up resources.

        Implementations should close any open connections, clients, or other resources.
        """
        pass

Base class for protocol adapters.

Each adapter implements the ADCP protocol methods and handles protocol-specific translation (MCP/A2A) while returning properly typed responses.

Initialize adapter with agent configuration.

Ancestors

  • abc.ABC

Subclasses

Methods

async def activate_signal(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
@abstractmethod
async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]:
    """Activate signal."""
    pass

Activate signal.

async def close(self) ‑> None
Expand source code
@abstractmethod
async def close(self) -> None:
    """
    Close the adapter and clean up resources.

    Implementations should close any open connections, clients, or other resources.
    """
    pass

Close the adapter and clean up resources.

Implementations should close any open connections, clients, or other resources.

async def get_media_buy_delivery(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
@abstractmethod
async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]:
    """Get media buy delivery."""
    pass

Get media buy delivery.

async def get_products(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
@abstractmethod
async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]:
    """Get advertising products."""
    pass

Get advertising products.

async def get_signals(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
@abstractmethod
async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]:
    """Get signals."""
    pass

Get signals.

async def list_authorized_properties(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
@abstractmethod
async def list_authorized_properties(self, params: dict[str, Any]) -> TaskResult[Any]:
    """List authorized properties."""
    pass

List authorized properties.

async def list_creative_formats(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
@abstractmethod
async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]:
    """List supported creative formats."""
    pass

List supported creative formats.

async def list_creatives(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
@abstractmethod
async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
    """List creatives."""
    pass

List creatives.

async def list_tools(self) ‑> list[str]
Expand source code
@abstractmethod
async def list_tools(self) -> list[str]:
    """
    List available tools from the agent.

    Returns:
        List of tool names
    """
    pass

List available tools from the agent.

Returns

List of tool names

async def provide_performance_feedback(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
@abstractmethod
async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]:
    """Provide performance feedback."""
    pass

Provide performance feedback.

async def sync_creatives(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
@abstractmethod
async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
    """Sync creatives."""
    pass

Sync creatives.