Module adcp.protocols.mcp

Functions

def extract_adcp_error(result: Any) ‑> dict[str, typing.Any] | None
Expand source code
def extract_adcp_error(result: Any) -> dict[str, Any] | None:
    """Extract and validate an AdCP ``adcp_error`` object from an MCP result.

    Implements AdCP spec §Client Detection Order (MCP paths 1 + 5) from
    docs/building/implementation/transport-errors.mdx. Only applies when
    ``isError`` is truthy. Returns a validated error object or ``None``.
    """
    if not getattr(result, "isError", False):
        return None

    sc = getattr(result, "structuredContent", None)
    if isinstance(sc, dict):
        validated = _validate_adcp_error(sc.get("adcp_error"))
        if validated is not None:
            return validated

    for item in getattr(result, "content", None) or []:
        text = _text_of(item)
        # Apply the same 1MB pre-parse cap as the success path to prevent a
        # malicious server returning ``isError=true`` plus a giant payload from
        # forcing a multi-MB json.loads into memory before the 4KB validation
        # would reject it.
        if text is None or len(text) > _MAX_TEXT_SIZE_BYTES:
            continue
        try:
            parsed = json.loads(text)
        except (json.JSONDecodeError, ValueError):
            continue
        if isinstance(parsed, dict):
            validated = _validate_adcp_error(parsed.get("adcp_error"))
            if validated is not None:
                return validated
    return None

Extract and validate an AdCP adcp_error object from an MCP result.

Implements AdCP spec §Client Detection Order (MCP paths 1 + 5) from docs/building/implementation/transport-errors.mdx. Only applies when isError is truthy. Returns a validated error object or None.

def extract_adcp_success(result: Any) ‑> dict[str, typing.Any] | None
Expand source code
def extract_adcp_success(result: Any) -> dict[str, Any] | None:
    """Extract AdCP success response data from an MCP tool result.

    Implements the normative algorithm from AdCP spec §MCP Response Extraction
    (docs/building/implementation/mcp-response-extraction.mdx):

    1. If ``isError`` is truthy, return ``None`` — error extraction is a
       separate path.
    2. ``structuredContent`` — if present and a non-array object that is NOT
       an ``adcp_error``-only payload, return it.
    3. Text fallback — iterate ``content[]`` in order; for each ``type='text'``
       item within the 1MB size limit, ``json.loads`` and return the result
       if it is a non-array object that is NOT ``adcp_error``-only.
    4. No structured data found — return ``None``.
    """
    if getattr(result, "isError", False):
        return None

    sc = getattr(result, "structuredContent", None)
    if isinstance(sc, dict) and not (len(sc) == 1 and "adcp_error" in sc):
        return sc

    for item in getattr(result, "content", None) or []:
        text = _text_of(item)
        if text is None or len(text) > _MAX_TEXT_SIZE_BYTES:
            continue
        try:
            parsed = json.loads(text)
        except (json.JSONDecodeError, ValueError):
            continue
        if (
            isinstance(parsed, dict)
            and not (len(parsed) == 1 and "adcp_error" in parsed)
        ):
            return parsed
    return None

Extract AdCP success response data from an MCP tool result.

Implements the normative algorithm from AdCP spec §MCP Response Extraction (docs/building/implementation/mcp-response-extraction.mdx):

  1. If isError is truthy, return None — error extraction is a separate path.
  2. structuredContent — if present and a non-array object that is NOT an adcp_error-only payload, return it.
  3. Text fallback — iterate content[] in order; for each type='text' item within the 1MB size limit, json.loads and return the result if it is a non-array object that is NOT adcp_error-only.
  4. No structured data found — return None.

Classes

class MCPAdapter (*args: Any, **kwargs: Any)
Expand source code
class MCPAdapter(ProtocolAdapter):
    """Adapter for MCP protocol using official Python MCP SDK."""

    def __init__(self, *args: Any, **kwargs: Any):
        super().__init__(*args, **kwargs)
        if not MCP_AVAILABLE:
            raise ImportError(
                "MCP SDK not installed. Install with: pip install mcp (requires Python 3.10+)"
            )
        self._session: Any = None
        self._exit_stack: Any = None

    async def _cleanup_failed_connection(self, context: str) -> None:
        """
        Clean up resources after a failed connection attempt.

        This method handles cleanup without raising exceptions to avoid
        masking the original connection error.

        Args:
            context: Description of the context for logging (e.g., "during connection attempt")
        """
        if self._exit_stack is not None:
            old_stack = self._exit_stack
            self._exit_stack = None
            self._session = None
            try:
                await old_stack.aclose()
            except BaseException as cleanup_error:
                # Handle all cleanup errors including ExceptionGroup
                # Re-raise KeyboardInterrupt and SystemExit immediately
                if isinstance(cleanup_error, (KeyboardInterrupt, SystemExit)):
                    raise

                if isinstance(cleanup_error, asyncio.CancelledError):
                    logger.debug(f"MCP session cleanup cancelled {context}")
                    return

                # Handle ExceptionGroup/BaseExceptionGroup from task group failures (Python 3.11+)
                # ExceptionGroup: for Exception subclasses (e.g., HTTPStatusError)
                # BaseExceptionGroup: for BaseException subclasses (e.g., CancelledError)
                # We need both because CancelledError is a BaseException, not an Exception
                is_exception_group = (
                    _ExceptionGroup is not None and isinstance(cleanup_error, _ExceptionGroup)
                ) or (
                    _BaseExceptionGroup is not None
                    and isinstance(cleanup_error, _BaseExceptionGroup)
                )

                if is_exception_group:
                    # Check if all exceptions in the group are CancelledError
                    # If so, treat the entire group as a cancellation
                    all_cancelled = all(
                        isinstance(exc, asyncio.CancelledError)
                        for exc in cleanup_error.exceptions  # type: ignore[attr-defined]
                    )
                    if all_cancelled:
                        logger.debug(f"MCP session cleanup cancelled {context}")
                        return

                    # Mixed group: skip CancelledErrors and log real errors
                    exceptions = cleanup_error.exceptions  # type: ignore[attr-defined]
                    cancelled_errors = [
                        exc for exc in exceptions if isinstance(exc, asyncio.CancelledError)
                    ]
                    cancelled_count = len(cancelled_errors)
                    if cancelled_count > 0:
                        logger.debug(
                            f"Skipping {cancelled_count} CancelledError(s) "
                            f"in mixed exception group {context}"
                        )

                    # Log each non-cancelled exception individually
                    for exc in exceptions:
                        if not isinstance(exc, asyncio.CancelledError):
                            self._log_cleanup_error(exc, context)
                else:
                    self._log_cleanup_error(cleanup_error, context)

    def _log_cleanup_error(self, exc: BaseException, context: str) -> None:
        """Log a cleanup error without raising."""
        # Check for known cleanup error patterns from httpx/anyio
        exc_str = str(exc).lower()

        # Common cleanup errors that are expected when connection fails
        is_known_cleanup_error = (
            isinstance(exc, RuntimeError)
            and ("cancel scope" in exc_str or "async context" in exc_str)
        ) or (
            # HTTP errors during cleanup (if httpx is available)
            HTTPX_AVAILABLE
            and HTTPStatusError is not None
            and isinstance(exc, HTTPStatusError)
        )

        if is_known_cleanup_error:
            # Expected cleanup errors - log at debug level without stack trace
            logger.debug(f"Ignoring expected cleanup error {context}: {exc}")
        else:
            # Truly unexpected cleanup errors - log at warning with full context
            logger.warning(f"Unexpected error during cleanup {context}: {exc}", exc_info=True)

    async def _get_session(self) -> ClientSession:
        """
        Get or create MCP client session with URL fallback handling.

        Raises:
            ADCPConnectionError: If connection to agent fails
        """
        if self._session is not None:
            return self._session  # type: ignore[no-any-return]

        logger.debug(f"Creating MCP session for agent {self.agent_config.id}")

        # Parse the agent URI to determine transport type
        parsed = urlparse(self.agent_config.agent_uri)

        # Use SSE transport for HTTP/HTTPS endpoints
        if parsed.scheme in ("http", "https"):
            self._exit_stack = AsyncExitStack()

            # Create SSE client with authentication header
            headers = {}
            if self.agent_config.auth_token:
                # Support custom auth headers and types
                if self.agent_config.auth_type == "bearer":
                    headers[self.agent_config.auth_header] = (
                        f"Bearer {self.agent_config.auth_token}"
                    )
                else:
                    headers[self.agent_config.auth_header] = self.agent_config.auth_token

            # Try the user's exact URL first
            urls_to_try = [self.agent_config.agent_uri]

            # If URL doesn't end with /mcp, also try with /mcp suffix
            if not self.agent_config.agent_uri.rstrip("/").endswith("/mcp"):
                base_uri = self.agent_config.agent_uri.rstrip("/")
                urls_to_try.append(f"{base_uri}/mcp")

            # RFC 9421 auto-signing: if ADCPClient installed a signing request
            # hook, wire it into streamable_http via a custom httpx client
            # factory. SSE transport has no equivalent knob — warn the user
            # and fall through to unsigned SSE.
            streamable_http_extra: dict[str, Any] = {}
            if self.signing_request_hook is not None:
                if self.agent_config.mcp_transport == "streamable_http":
                    streamable_http_extra["httpx_client_factory"] = (
                        _make_signing_http_factory(self.signing_request_hook)
                    )
                else:
                    logger.warning(
                        "RFC 9421 auto-signing is not supported on MCP SSE "
                        "transport for agent %s; use mcp_transport='streamable_http' "
                        "to sign outgoing requests.",
                        self.agent_config.id,
                    )

            last_error = None
            for url in urls_to_try:
                try:
                    # Choose transport based on configuration
                    if self.agent_config.mcp_transport == "streamable_http":
                        # Use streamable HTTP transport (newer, bidirectional)
                        read, write, _get_session_id = await self._exit_stack.enter_async_context(
                            streamablehttp_client(
                                url,
                                headers=headers,
                                timeout=self.agent_config.timeout,
                                **streamable_http_extra,
                            )
                        )
                    else:
                        # Use SSE transport (legacy, but widely supported)
                        read, write = await self._exit_stack.enter_async_context(
                            sse_client(url, headers=headers)
                        )

                    self._session = await self._exit_stack.enter_async_context(
                        _ClientSession(read, write)
                    )

                    # Initialize the session
                    await self._session.initialize()

                    logger.info(
                        f"Connected to MCP agent {self.agent_config.id} at {url} "
                        f"using {self.agent_config.mcp_transport} transport"
                    )
                    if url != self.agent_config.agent_uri:
                        logger.info(
                            f"Note: Connected using fallback URL {url} "
                            f"(configured: {self.agent_config.agent_uri})"
                        )

                    return self._session  # type: ignore[no-any-return]
                except BaseException as e:
                    # Catch BaseException to handle CancelledError from failed initialization
                    # Re-raise KeyboardInterrupt and SystemExit immediately
                    if isinstance(e, (KeyboardInterrupt, SystemExit)):
                        raise
                    last_error = e
                    # Clean up the exit stack on failure to avoid resource leaks
                    await self._cleanup_failed_connection("during connection attempt")

                    # If this isn't the last URL to try, create a new exit stack and continue
                    if url != urls_to_try[-1]:
                        logger.debug(f"Retrying with next URL after error: {last_error}")
                        self._exit_stack = AsyncExitStack()
                        continue
                    # If this was the last URL, raise the error
                    logger.error(
                        f"Failed to connect to MCP agent {self.agent_config.id} using "
                        f"{self.agent_config.mcp_transport} transport. "
                        f"Tried URLs: {', '.join(urls_to_try)}"
                    )

                    # Classify error type for better exception handling
                    error_str = str(last_error).lower()
                    if "401" in error_str or "403" in error_str or "unauthorized" in error_str:
                        from adcp.exceptions import ADCPAuthenticationError

                        raise ADCPAuthenticationError(
                            f"Authentication failed: {last_error}",
                            agent_id=self.agent_config.id,
                            agent_uri=self.agent_config.agent_uri,
                        ) from last_error
                    elif "timeout" in error_str:
                        raise ADCPTimeoutError(
                            f"Connection timeout: {last_error}",
                            agent_id=self.agent_config.id,
                            agent_uri=self.agent_config.agent_uri,
                            timeout=self.agent_config.timeout,
                        ) from last_error
                    else:
                        raise ADCPConnectionError(
                            f"Failed to connect: {last_error}",
                            agent_id=self.agent_config.id,
                            agent_uri=self.agent_config.agent_uri,
                        ) from last_error

            # This shouldn't be reached, but just in case
            raise RuntimeError(f"Failed to connect to MCP agent at {self.agent_config.agent_uri}")
        else:
            raise ValueError(f"Unsupported transport scheme: {parsed.scheme}")

    def _serialize_mcp_content(self, content: list[Any]) -> list[dict[str, Any]]:
        """
        Convert MCP SDK content objects to plain dicts.

        The MCP SDK returns Pydantic objects (TextContent, ImageContent, etc.)
        but the rest of the ADCP client expects protocol-agnostic dicts.
        This method handles the translation at the protocol boundary.

        Args:
            content: List of MCP content items (may be dicts or Pydantic objects)

        Returns:
            List of plain dicts representing the content
        """
        result = []
        for item in content:
            # Already a dict, pass through
            if isinstance(item, dict):
                result.append(item)
            # Pydantic v2 model with model_dump()
            elif hasattr(item, "model_dump"):
                result.append(item.model_dump())
            # Pydantic v1 model with dict()
            elif hasattr(item, "dict") and callable(item.dict):
                result.append(item.dict())
            # Fallback: try to access __dict__
            elif hasattr(item, "__dict__"):
                result.append(dict(item.__dict__))
            # Last resort: serialize as unknown type
            else:
                logger.warning(f"Unknown MCP content type: {type(item)}, serializing as string")
                result.append({"type": "unknown", "data": str(item)})
        return result

    async def _call_mcp_tool(self, tool_name: str, params: dict[str, Any]) -> TaskResult[Any]:
        """Call a tool using MCP protocol."""
        start_time = time.time() if self.agent_config.debug else None
        debug_info = None
        debug_request: dict[str, Any] = {}
        if _idempotency.is_mutating(tool_name) and self.idempotency_capability_check:
            await self.idempotency_capability_check()
        params, idempotency_key = _idempotency.inject_key(
            tool_name, params, client_token=self.idempotency_client_token
        )

        try:
            session = await self._get_session()

            if self.agent_config.debug:
                debug_request = {
                    "protocol": "MCP",
                    "tool": tool_name,
                    "params": _idempotency.redact_params(params),
                    "transport": self.agent_config.mcp_transport,
                }

            # Stamp the AdCP operation name so the httpx request event hook
            # installed by ADCPClient (when a SigningConfig is present) can
            # look up the seller's signing policy for this call. Scoped
            # tightly around call_tool so session.initialize() above and
            # other out-of-band traffic stay outside the signing scope.
            signing_token = _signing_operation.set(tool_name)
            try:
                # Call the tool using MCP client session
                result = await session.call_tool(tool_name, params)
            finally:
                _signing_operation.reset(signing_token)

            # Check if this is an error response
            is_error = hasattr(result, "isError") and result.isError

            # Extract human-readable message from content
            message_text = None
            if hasattr(result, "content") and result.content:
                serialized_content = self._serialize_mcp_content(result.content)
                if isinstance(serialized_content, list):
                    for item in serialized_content:
                        is_text = isinstance(item, dict) and item.get("type") == "text"
                        if is_text and item.get("text"):
                            message_text = item["text"]
                            break

            # Handle error responses per transport-errors.mdx §Client Detection
            # Order. Extract the adcp_error object from structuredContent first,
            # then from text fallback — whichever is present.
            if is_error:
                adcp_error = extract_adcp_error(result)
                # Raise typed idempotency exceptions before building a generic
                # TaskResult(failed), so callers that catch them distinctly
                # don't lose the signal.
                if adcp_error and adcp_error.get("code") in (
                    "IDEMPOTENCY_CONFLICT",
                    "IDEMPOTENCY_EXPIRED",
                ):
                    from adcp.exceptions import classify_task_error

                    raise classify_task_error(
                        tool_name, [adcp_error], agent_id=self.agent_config.id
                    )
                # FastMCP-style is_error with plain-text content: text-match
                # fallback for the two idempotency codes.
                _idempotency.raise_for_idempotency_text(
                    tool_name, message_text, self.agent_config.id
                )
                error_message = (
                    (adcp_error.get("message") if adcp_error else None)
                    or message_text
                    or "Tool execution failed"
                )
                if self.agent_config.debug and start_time:
                    duration_ms = (time.time() - start_time) * 1000
                    debug_info = DebugInfo(
                        request=debug_request,
                        response={
                            "error": error_message,
                            "is_error": True,
                            "adcp_error": adcp_error,
                        },
                        duration_ms=duration_ms,
                    )
                return TaskResult[Any](
                    status=TaskStatus.FAILED,
                    error=error_message,
                    success=False,
                    debug_info=debug_info,
                    idempotency_key=idempotency_key,
                )

            # Success extraction per mcp-response-extraction.mdx §Extraction
            # Algorithm: prefer structuredContent (MCP 2025-03-26+), fall back
            # to JSON-parsing content[].text for older servers (including the
            # AdCP reference training agent).
            data_to_return = extract_adcp_success(result)
            if data_to_return is None:
                raise ValueError(
                    f"MCP tool {tool_name} returned no structured AdCP data. "
                    f"Neither structuredContent nor content[].text yielded a "
                    f"parseable non-adcp_error JSON object. "
                    f"Got content: {result.content if hasattr(result, 'content') else 'none'}"
                )

            if self.agent_config.debug and start_time:
                duration_ms = (time.time() - start_time) * 1000
                debug_info = DebugInfo(
                    request=debug_request,
                    response=_idempotency.deep_redact(
                        {
                            "data": data_to_return,
                            "message": message_text,
                            "is_error": False,
                        }
                    ),
                    duration_ms=duration_ms,
                )

            _idempotency.raise_for_idempotency_error(
                tool_name, data_to_return, self.agent_config.id
            )

            # Return both the structured data and the human-readable message
            task_result = TaskResult[Any](
                status=TaskStatus.COMPLETED,
                data=data_to_return,
                message=message_text,
                success=True,
                debug_info=debug_info,
            )
            return _idempotency.annotate_result(task_result, idempotency_key)

        except (IdempotencyConflictError, IdempotencyExpiredError):
            # Propagate typed idempotency errors — callers MUST handle these
            # distinctly (mint fresh key / reconcile state). Other ADCPError
            # subclasses (connection, timeout) continue to be converted to
            # TaskResult(failed) below, preserving the existing contract.
            raise
        except Exception as e:
            if self.agent_config.debug and start_time:
                duration_ms = (time.time() - start_time) * 1000
                debug_info = DebugInfo(
                    request=debug_request,
                    response={"error": str(e)},
                    duration_ms=duration_ms,
                )
            return TaskResult[Any](
                status=TaskStatus.FAILED,
                error=str(e),
                success=False,
                debug_info=debug_info,
                idempotency_key=idempotency_key,
            )

    # ========================================================================
    # ADCP Protocol Methods
    # ========================================================================

    async def get_products(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get advertising products."""
        return await self._call_mcp_tool("get_products", params)

    async def list_creative_formats(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List supported creative formats."""
        return await self._call_mcp_tool("list_creative_formats", params)

    async def sync_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync creatives."""
        return await self._call_mcp_tool("sync_creatives", params)

    async def list_creatives(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List creatives."""
        return await self._call_mcp_tool("list_creatives", params)

    async def get_media_buy_delivery(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get media buy delivery."""
        return await self._call_mcp_tool("get_media_buy_delivery", params)

    async def get_media_buys(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get media buys with status, creative approval state, and optional delivery snapshots."""
        return await self._call_mcp_tool("get_media_buys", params)

    async def get_signals(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get signals."""
        return await self._call_mcp_tool("get_signals", params)

    async def activate_signal(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Activate signal."""
        return await self._call_mcp_tool("activate_signal", params)

    async def provide_performance_feedback(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Provide performance feedback."""
        return await self._call_mcp_tool("provide_performance_feedback", params)

    async def log_event(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Log event."""
        return await self._call_mcp_tool("log_event", params)

    async def sync_event_sources(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync event sources."""
        return await self._call_mcp_tool("sync_event_sources", params)

    async def sync_audiences(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync audiences."""
        return await self._call_mcp_tool("sync_audiences", params)

    async def sync_catalogs(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync catalogs."""
        return await self._call_mcp_tool("sync_catalogs", params)

    async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Generate preview URLs for a creative manifest."""
        return await self._call_mcp_tool("preview_creative", params)

    async def create_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Create media buy."""
        return await self._call_mcp_tool("create_media_buy", params)

    async def update_media_buy(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Update media buy."""
        return await self._call_mcp_tool("update_media_buy", params)

    async def build_creative(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Build creative."""
        return await self._call_mcp_tool("build_creative", params)

    async def get_creative_delivery(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get creative delivery."""
        return await self._call_mcp_tool("get_creative_delivery", params)

    async def list_accounts(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List accounts."""
        return await self._call_mcp_tool("list_accounts", params)

    async def sync_accounts(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync accounts."""
        return await self._call_mcp_tool("sync_accounts", params)

    async def get_account_financials(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get account financials."""
        return await self._call_mcp_tool("get_account_financials", params)

    async def report_usage(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Report account usage."""
        return await self._call_mcp_tool("report_usage", params)

    async def list_tools(self) -> list[str]:
        """List available tools from MCP agent."""
        session = await self._get_session()
        result = await session.list_tools()
        return [tool.name for tool in result.tools]

    async def get_agent_info(self) -> dict[str, Any]:
        """
        Get agent information including AdCP extension metadata from MCP server.

        MCP servers may expose metadata through:
        - Server capabilities exposed during initialization
        - extensions.adcp in server info (if supported)
        - Tool list

        Returns:
            Dictionary with agent metadata
        """
        session = await self._get_session()

        # Extract basic MCP server info
        info: dict[str, Any] = {
            "name": getattr(session, "server_name", None),
            "version": getattr(session, "server_version", None),
            "protocol": "mcp",
        }

        # Get available tools
        try:
            tools_result = await session.list_tools()
            tool_names = [tool.name for tool in tools_result.tools]
            if tool_names:
                info["tools"] = tool_names
        except Exception as e:
            logger.warning(f"Failed to list tools for {self.agent_config.id}: {e}")

        # Try to extract AdCP extension metadata from server capabilities
        # MCP servers may expose this in their initialization response
        if hasattr(session, "_server_capabilities"):
            capabilities = session._server_capabilities
            if isinstance(capabilities, dict):
                extensions = capabilities.get("extensions", {})
                adcp_ext = extensions.get("adcp", {})
                if adcp_ext:
                    info["adcp_version"] = adcp_ext.get("adcp_version")
                    info["protocols_supported"] = adcp_ext.get("protocols_supported")

        logger.info(f"Retrieved agent info for {self.agent_config.id}")
        return info

    async def close(self) -> None:
        """Close the MCP session and clean up resources."""
        await self._cleanup_failed_connection("during close")

    # ========================================================================
    # V3 Protocol Methods - Protocol Discovery
    # ========================================================================

    async def get_adcp_capabilities(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get AdCP capabilities from the agent."""
        return await self._call_mcp_tool("get_adcp_capabilities", params)

    # ========================================================================
    # V3 Protocol Methods - Content Standards
    # ========================================================================

    async def create_content_standards(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Create content standards configuration."""
        return await self._call_mcp_tool("create_content_standards", params)

    async def get_content_standards(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get content standards configuration."""
        return await self._call_mcp_tool("get_content_standards", params)

    async def list_content_standards(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List content standards configurations."""
        return await self._call_mcp_tool("list_content_standards", params)

    async def update_content_standards(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Update content standards configuration."""
        return await self._call_mcp_tool("update_content_standards", params)

    async def calibrate_content(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Calibrate content against standards."""
        return await self._call_mcp_tool("calibrate_content", params)

    async def validate_content_delivery(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Validate content delivery against standards."""
        return await self._call_mcp_tool("validate_content_delivery", params)

    async def get_media_buy_artifacts(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get artifacts associated with a media buy."""
        return await self._call_mcp_tool("get_media_buy_artifacts", params)

    # ========================================================================
    # V3 Protocol Methods - Governance
    # ========================================================================

    async def get_creative_features(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Evaluate governance features for a creative."""
        return await self._call_mcp_tool("get_creative_features", params)

    async def sync_plans(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync campaign governance plans."""
        return await self._call_mcp_tool("sync_plans", params)

    async def check_governance(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Check an action against campaign governance."""
        return await self._call_mcp_tool("check_governance", params)

    async def report_plan_outcome(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Report the outcome of a governed action."""
        return await self._call_mcp_tool("report_plan_outcome", params)

    async def get_plan_audit_logs(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Retrieve governance audit logs for plans."""
        return await self._call_mcp_tool("get_plan_audit_logs", params)

    # ========================================================================
    # V3 Protocol Methods - Sponsored Intelligence
    # ========================================================================

    async def si_get_offering(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get sponsored intelligence offering."""
        return await self._call_mcp_tool("si_get_offering", params)

    async def si_initiate_session(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Initiate sponsored intelligence session."""
        return await self._call_mcp_tool("si_initiate_session", params)

    async def si_send_message(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Send message in sponsored intelligence session."""
        return await self._call_mcp_tool("si_send_message", params)

    async def si_terminate_session(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Terminate sponsored intelligence session."""
        return await self._call_mcp_tool("si_terminate_session", params)

    # ========================================================================
    # V3 Protocol Methods - Governance (Property Lists)
    # ========================================================================

    async def create_property_list(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Create a property list for governance."""
        return await self._call_mcp_tool("create_property_list", params)

    async def get_property_list(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get a property list with optional resolution."""
        return await self._call_mcp_tool("get_property_list", params)

    async def list_property_lists(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List property lists."""
        return await self._call_mcp_tool("list_property_lists", params)

    async def update_property_list(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Update a property list."""
        return await self._call_mcp_tool("update_property_list", params)

    async def delete_property_list(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Delete a property list."""
        return await self._call_mcp_tool("delete_property_list", params)

    # ========================================================================
    # V3 Protocol Methods - Governance (Collection Lists)
    # ========================================================================

    async def create_collection_list(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Create a collection list for governance."""
        return await self._call_mcp_tool("create_collection_list", params)

    async def get_collection_list(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get a collection list with optional resolution."""
        return await self._call_mcp_tool("get_collection_list", params)

    async def list_collection_lists(self, params: dict[str, Any]) -> TaskResult[Any]:
        """List collection lists."""
        return await self._call_mcp_tool("list_collection_lists", params)

    async def update_collection_list(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Update a collection list."""
        return await self._call_mcp_tool("update_collection_list", params)

    async def delete_collection_list(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Delete a collection list."""
        return await self._call_mcp_tool("delete_collection_list", params)

    # ========================================================================
    # V3 Protocol Methods - Governance (Sync Governance)
    # ========================================================================

    async def sync_governance(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Sync governance agents attached to an account."""
        return await self._call_mcp_tool("sync_governance", params)

    # ========================================================================
    # V3 Protocol Methods - TMP
    # ========================================================================

    async def context_match(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Match ad context to buyer packages."""
        return await self._call_mcp_tool("context_match", params)

    async def identity_match(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Match user identity for package eligibility."""
        return await self._call_mcp_tool("identity_match", params)

    # ========================================================================
    # V3 Protocol Methods - Brand Rights
    # ========================================================================

    async def get_brand_identity(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get brand identity information."""
        return await self._call_mcp_tool("get_brand_identity", params)

    async def get_rights(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Get available rights for licensing."""
        return await self._call_mcp_tool("get_rights", params)

    async def acquire_rights(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Acquire rights for brand content usage."""
        return await self._call_mcp_tool("acquire_rights", params)

    async def update_rights(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Update terms of an existing rights acquisition."""
        return await self._call_mcp_tool("update_rights", params)

    # ========================================================================
    # V3 Protocol Methods - Compliance
    # ========================================================================

    async def comply_test_controller(self, params: dict[str, Any]) -> TaskResult[Any]:
        """Compliance test controller (sandbox only)."""
        return await self._call_mcp_tool("comply_test_controller", params)

Adapter for MCP protocol using official Python MCP SDK.

Initialize adapter with agent configuration.

Ancestors

Methods

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close the MCP session and clean up resources."""
    await self._cleanup_failed_connection("during close")

Close the MCP session and clean up resources.

async def get_agent_info(self) ‑> dict[str, typing.Any]
Expand source code
async def get_agent_info(self) -> dict[str, Any]:
    """
    Get agent information including AdCP extension metadata from MCP server.

    MCP servers may expose metadata through:
    - Server capabilities exposed during initialization
    - extensions.adcp in server info (if supported)
    - Tool list

    Returns:
        Dictionary with agent metadata
    """
    session = await self._get_session()

    # Extract basic MCP server info
    info: dict[str, Any] = {
        "name": getattr(session, "server_name", None),
        "version": getattr(session, "server_version", None),
        "protocol": "mcp",
    }

    # Get available tools
    try:
        tools_result = await session.list_tools()
        tool_names = [tool.name for tool in tools_result.tools]
        if tool_names:
            info["tools"] = tool_names
    except Exception as e:
        logger.warning(f"Failed to list tools for {self.agent_config.id}: {e}")

    # Try to extract AdCP extension metadata from server capabilities
    # MCP servers may expose this in their initialization response
    if hasattr(session, "_server_capabilities"):
        capabilities = session._server_capabilities
        if isinstance(capabilities, dict):
            extensions = capabilities.get("extensions", {})
            adcp_ext = extensions.get("adcp", {})
            if adcp_ext:
                info["adcp_version"] = adcp_ext.get("adcp_version")
                info["protocols_supported"] = adcp_ext.get("protocols_supported")

    logger.info(f"Retrieved agent info for {self.agent_config.id}")
    return info

Get agent information including AdCP extension metadata from MCP server.

MCP servers may expose metadata through: - Server capabilities exposed during initialization - extensions.adcp in server info (if supported) - Tool list

Returns

Dictionary with agent metadata

async def list_tools(self) ‑> list[str]
Expand source code
async def list_tools(self) -> list[str]:
    """List available tools from MCP agent."""
    session = await self._get_session()
    result = await session.list_tools()
    return [tool.name for tool in result.tools]

List available tools from MCP agent.

async def preview_creative(self, params: dict[str, Any]) ‑> TaskResult[Any]
Expand source code
async def preview_creative(self, params: dict[str, Any]) -> TaskResult[Any]:
    """Generate preview URLs for a creative manifest."""
    return await self._call_mcp_tool("preview_creative", params)

Generate preview URLs for a creative manifest.

Inherited members