Module adcp.client

Classes

class ADCPClient (agent_config: AgentConfig,
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
webhook_timestamp_tolerance: int = 300,
capabilities_ttl: float = 3600.0,
validate_features: bool = False,
strict_idempotency: bool = False,
signing: SigningConfig | None = None)
Expand source code
class ADCPClient:
    """Client for interacting with a single AdCP agent."""

    def __init__(
        self,
        agent_config: AgentConfig,
        webhook_url_template: str | None = None,
        webhook_secret: str | None = None,
        on_activity: Callable[[Activity], None] | None = None,
        webhook_timestamp_tolerance: int = 300,
        capabilities_ttl: float = 3600.0,
        validate_features: bool = False,
        strict_idempotency: bool = False,
        signing: SigningConfig | None = None,
    ):
        """
        Initialize ADCP client for a single agent.

        Args:
            agent_config: Agent configuration
            webhook_url_template: Template for webhook URLs with {agent_id},
                {task_type}, {operation_id}
            webhook_secret: Secret for webhook signature verification
            on_activity: Callback for activity events
            webhook_timestamp_tolerance: Maximum age (in seconds) for webhook
                timestamps. Webhooks with timestamps older than this or more than
                this far in the future are rejected. Defaults to 300 (5 minutes).
            capabilities_ttl: Time-to-live in seconds for cached capabilities (default: 1 hour)
            validate_features: When True, automatically check that the seller supports
                required features before making task calls (e.g., sync_audiences requires
                audience_targeting). Requires capabilities to have been fetched first.
            strict_idempotency: When True, verify the seller declared
                ``adcp.idempotency.replay_ttl_seconds`` in capabilities before any
                mutating call. Fetches capabilities lazily on first use. Raises
                ``IdempotencyUnsupportedError`` if the declaration is missing —
                sellers that don't declare it provide no retry-safety guarantee
                per AdCP #2315. Defaults to False for backward compatibility.
            signing: Optional RFC 9421 request-signing config. When provided,
                the client automatically attaches ``Signature`` /
                ``Signature-Input`` / ``Content-Digest`` headers to operations
                the seller's ``request_signing`` capability lists in
                ``required_for``, ``warn_for``, or ``supported_for``. The
                seller's ``covers_content_digest`` policy determines whether
                the body is bound to the signature. Generate a key with
                ``adcp-keygen`` and publish the public JWK at your
                ``jwks_uri``. Supported on both A2A and MCP
                (``mcp_transport="streamable_http"``); SSE-transport MCP
                logs a warning and falls through unsigned.
        """
        self.agent_config = agent_config
        self.webhook_url_template = webhook_url_template
        self.webhook_secret = webhook_secret
        self.on_activity = on_activity
        self.webhook_timestamp_tolerance = webhook_timestamp_tolerance
        self.capabilities_ttl = capabilities_ttl
        self.validate_features = validate_features
        self.strict_idempotency = strict_idempotency
        self.signing = signing

        # Capabilities cache
        self._capabilities: GetAdcpCapabilitiesResponse | None = None
        self._feature_resolver: FeatureResolver | None = None
        self._capabilities_fetched_at: float | None = None
        self._idempotency_capability_verified: bool = False
        # Unique per-instance token so use_idempotency_key scopes to this
        # client and does not bleed to siblings (AdCP #2315 cross-seller risk).
        from uuid import uuid4 as _uuid4

        self._idempotency_client_token: str = _uuid4().hex

        # Initialize protocol adapter
        self.adapter: ProtocolAdapter
        if agent_config.protocol == Protocol.A2A:
            self.adapter = A2AAdapter(agent_config)
        elif agent_config.protocol == Protocol.MCP:
            self.adapter = MCPAdapter(agent_config)
        else:
            raise ValueError(f"Unsupported protocol: {agent_config.protocol}")

        self.adapter.idempotency_client_token = self._idempotency_client_token
        if strict_idempotency:
            self.adapter.idempotency_capability_check = self._ensure_idempotency_capability
        if signing is not None:
            self.adapter.signing_request_hook = self._sign_outgoing_request

        # Initialize simple API accessor (lazy import to avoid circular dependency)
        from adcp.simple import SimpleAPI

        self.simple = SimpleAPI(self)

    async def _ensure_idempotency_capability(self) -> None:
        """Verify the seller positively declares idempotency support in capabilities.

        Called before every mutating request when ``strict_idempotency=True``.
        Fetches capabilities on first invocation; subsequent calls are no-ops
        once the declaration has been observed. Raises
        ``IdempotencyUnsupportedError`` when ``adcp.idempotency`` is missing,
        declares ``supported=False`` (seller does not dedupe — naive retry
        would double-process), or declares ``supported=True`` without a
        ``replay_ttl_seconds`` window.

        Sets ``_idempotency_capability_verified = True`` BEFORE calling
        ``fetch_capabilities`` so any recursive dispatch through the adapter
        terminates (``get_adcp_capabilities`` is non-mutating, so it would
        short-circuit anyway — but this guard protects against future refactors
        that might add it to the mutating set).
        """
        from adcp.exceptions import IdempotencyUnsupportedError

        if self._idempotency_capability_verified:
            return

        self._idempotency_capability_verified = True
        try:
            caps = await self.fetch_capabilities()
            adcp_info = getattr(caps, "adcp", None)
            idempotency_info = getattr(adcp_info, "idempotency", None) if adcp_info else None

            if idempotency_info is None:
                raise IdempotencyUnsupportedError(
                    agent_id=self.agent_config.id,
                    agent_uri=self.agent_config.agent_uri,
                    reason="seller did not declare adcp.idempotency",
                )

            supported = getattr(idempotency_info, "supported", None)
            if supported is False:
                raise IdempotencyUnsupportedError(
                    agent_id=self.agent_config.id,
                    agent_uri=self.agent_config.agent_uri,
                    reason="seller declared adcp.idempotency.supported=false",
                )

            ttl = getattr(idempotency_info, "replay_ttl_seconds", None)
            if ttl is None:
                raise IdempotencyUnsupportedError(
                    agent_id=self.agent_config.id,
                    agent_uri=self.agent_config.agent_uri,
                    reason=(
                        "seller declared adcp.idempotency.supported=true but omitted "
                        "replay_ttl_seconds"
                    ),
                )
        except Exception:
            self._idempotency_capability_verified = False
            raise

    async def _sign_outgoing_request(self, request: httpx.Request) -> None:
        """httpx request event hook that attaches RFC 9421 signature headers.

        Installed on the protocol adapter's httpx client when a
        ``SigningConfig`` was passed to ``ADCPClient``. Consults the
        seller's advertised ``request_signing`` capability and signs only
        the operations the seller listed in ``required_for``, ``warn_for``,
        or ``supported_for`` — other requests (including the agent-card
        fetch and ``get_adcp_capabilities`` itself) pass through unsigned.
        The ``covers_content_digest`` tri-state determines whether the
        body is bound to the signature.
        """
        if self.signing is None:
            return
        operation = _signing_current_operation.get()
        # Unset ContextVar → out-of-band call (agent-card fetch, session
        # initialize, etc). Skip without fetching capabilities.
        #
        # get_adcp_capabilities → bootstrap carve-out: signing it would
        # require capabilities we don't have yet, and if a pathological
        # seller listed this op in its own required_for we'd recurse.
        # Keep this check narrow — only operations strictly required to
        # *obtain* capabilities belong here. Today that's just
        # get_adcp_capabilities. A future adapter that adds another
        # capabilities-precondition op MUST extend this guard.
        if operation is None or operation == "get_adcp_capabilities":
            return

        caps = await self.fetch_capabilities()
        req_signing = getattr(caps, "request_signing", None)

        # Detect and surface a malformed seller config: supported=False is
        # "signatures are ignored", but populating required_for alongside
        # it is contradictory. The classifier correctly skips (matches
        # verifier behavior) but the silent downgrade hides a config bug
        # that will bite pilots.
        if (
            req_signing is not None
            and not req_signing.supported
            and (req_signing.required_for or req_signing.warn_for)
        ):
            logger.warning(
                "Seller %s advertises request_signing.supported=false but "
                "populates required_for/warn_for — treating as unsupported "
                "per spec. Verify the seller's capability advertisement.",
                self.agent_config.id,
            )

        decision = operation_needs_signing(req_signing, operation)
        if decision == "skip":
            return

        covers_policy: str | None = None
        if req_signing is not None and req_signing.covers_content_digest is not None:
            covers_policy = req_signing.covers_content_digest.value
        if covers_policy == "forbidden":
            cover_digest = False
        elif covers_policy == "required":
            cover_digest = True
        else:
            # "either" or absent — signer's choice; default stricter.
            cover_digest = True

        body = request.content
        signed = sign_request(
            method=request.method,
            url=str(request.url),
            headers=dict(request.headers),
            body=body,
            private_key=self.signing.private_key,
            key_id=self.signing.key_id,
            alg=self.signing.alg,
            cover_content_digest=cover_digest,
            tag=self.signing.tag,
        )
        # pop-then-set ensures our signed values are authoritative even if
        # another hook or earlier layer added a same-named header. httpx
        # headers are a case-insensitive MultiDict, so a naive assignment
        # could leave a duplicate value in a different case.
        for header_name, header_value in signed.as_dict().items():
            request.headers.pop(header_name, None)
            request.headers[header_name] = header_value

    def get_webhook_url(self, task_type: str, operation_id: str) -> str:
        """Generate webhook URL for a task."""
        if not self.webhook_url_template:
            raise ValueError("webhook_url_template not configured")

        return self.webhook_url_template.format(
            agent_id=self.agent_config.id,
            task_type=task_type,
            operation_id=operation_id,
        )

    def _emit_activity(self, activity: Activity) -> None:
        """Emit activity event."""
        if self.on_activity:
            self.on_activity(activity)

    @contextlib.contextmanager
    def use_idempotency_key(self, key: str) -> Iterator[str]:
        """Pin an ``idempotency_key`` for the next mutating call on THIS client.

        Use when you've persisted a key (e.g., in a buyer-side database) and
        want the SDK to send that exact key on resume or retry across process
        restarts. The key is validated against ``^[A-Za-z0-9_.:-]{16,255}$`` on
        entry; a ``ValueError`` is raised for malformed keys.

        Scope rules:

        * **Single-use within scope.** The first mutating call inside the
          ``with`` block consumes the pinned key; a second mutating call falls
          through to a fresh UUID. This protects against ``asyncio.gather``
          siblings accidentally sharing the key (which would trigger
          ``IDEMPOTENCY_CONFLICT`` or silently duplicate work). If you need to
          retry, wrap each attempt in its own ``with`` block.
        * **Client-scoped.** The pinned key applies only to calls on THIS
          client. A mutating call on a sibling ``ADCPClient`` inside the same
          ``with`` block generates a fresh key and emits a ``UserWarning`` —
          keys must be unique per (seller, request) pair (AdCP #2315).
        * **No nesting.** Nested ``use_idempotency_key`` on the same client
          raises ``RuntimeError``.

        Example::

            with client.use_idempotency_key(campaign.stored_key):
                result = await client.create_media_buy(request)
        """
        from adcp import _idempotency

        _idempotency.validate_key(key)
        token = self._idempotency_client_token
        if token in _idempotency._scoped_keys:
            raise RuntimeError(
                "use_idempotency_key is already active on this client; "
                "nested usage is not supported."
            )
        _idempotency._scoped_keys[token] = key
        try:
            yield key
        finally:
            _idempotency._scoped_keys.pop(token, None)

    # ========================================================================
    # Capability Validation
    # ========================================================================

    @property
    def capabilities(self) -> GetAdcpCapabilitiesResponse | None:
        """Return cached capabilities, or None if not yet fetched."""
        return self._capabilities

    @property
    def feature_resolver(self) -> FeatureResolver | None:
        """Return the FeatureResolver for cached capabilities, or None."""
        return self._feature_resolver

    async def fetch_capabilities(self) -> GetAdcpCapabilitiesResponse:
        """Fetch capabilities, using cache if still valid.

        Returns:
            The seller's capabilities response.
        """
        if self._capabilities is not None and self._capabilities_fetched_at is not None:
            elapsed = time.monotonic() - self._capabilities_fetched_at
            if elapsed < self.capabilities_ttl:
                return self._capabilities

        return await self.refresh_capabilities()

    async def refresh_capabilities(self) -> GetAdcpCapabilitiesResponse:
        """Fetch capabilities from the seller, bypassing cache.

        Returns:
            The seller's capabilities response.
        """
        result = await self.get_adcp_capabilities(GetAdcpCapabilitiesRequest())
        if result.success and result.data is not None:
            self._capabilities = result.data
            self._feature_resolver = FeatureResolver(result.data)
            self._capabilities_fetched_at = time.monotonic()
            return self._capabilities
        raise ADCPError(
            f"Failed to fetch capabilities: {result.error or result.message}",
            agent_id=self.agent_config.id,
            agent_uri=self.agent_config.agent_uri,
        )

    def _ensure_resolver(self) -> FeatureResolver:
        """Return the FeatureResolver, raising if capabilities haven't been fetched."""
        if self._feature_resolver is None:
            raise ADCPError(
                "Cannot check feature support: capabilities have not been fetched. "
                "Call fetch_capabilities() first.",
                agent_id=self.agent_config.id,
                agent_uri=self.agent_config.agent_uri,
            )
        return self._feature_resolver

    def supports(self, feature: str) -> bool:
        """Check if the seller supports a feature.

        Supports multiple feature namespaces:
        - Protocol support: ``supports("media_buy")`` checks ``supported_protocols``
        - Extension support: ``supports("ext:scope3")`` checks ``extensions_supported``
        - Targeting: ``supports("targeting.geo_countries")`` checks
          ``media_buy.execution.targeting``
        - Media buy features: ``supports("audience_targeting")`` checks
          ``media_buy.features``
        - Signals features: ``supports("catalog_signals")`` checks
          ``signals.features``

        Args:
            feature: Feature identifier to check.

        Returns:
            True if the seller declares the feature as supported.

        Raises:
            ADCPError: If capabilities have not been fetched yet.
        """
        return self._ensure_resolver().supports(feature)

    def require(self, *features: str) -> None:
        """Assert that the seller supports all listed features.

        Args:
            *features: Feature identifiers to require.

        Raises:
            ADCPFeatureUnsupportedError: If any features are not supported.
            ADCPError: If capabilities have not been fetched yet.
        """
        self._ensure_resolver().require(
            *features,
            agent_id=self.agent_config.id,
            agent_uri=self.agent_config.agent_uri,
        )

    def _validate_task_features(self, task_name: str) -> None:
        """Check feature requirements for a task if validate_features is enabled.

        Returns without checking if validate_features is False or capabilities
        haven't been fetched yet (logs a warning in the latter case).
        """
        if not self.validate_features:
            return
        if self._feature_resolver is None:
            logger.warning(
                "validate_features is enabled but capabilities have not been fetched. "
                "Call fetch_capabilities() to enable feature validation."
            )
            return
        required_feature = TASK_FEATURE_MAP.get(task_name)
        if required_feature is None:
            return
        self.require(required_feature)

    async def get_products(
        self,
        request: GetProductsRequest,
        fetch_previews: bool = False,
        preview_output_format: str = "url",
        creative_agent_client: ADCPClient | None = None,
    ) -> TaskResult[GetProductsResponse]:
        """
        Get advertising products.

        Args:
            request: Request parameters
            fetch_previews: If True, generate preview URLs for each product's formats
                (uses batch API for 5-10x performance improvement)
            preview_output_format: "url" for iframe URLs (default), "html" for direct
                embedding (2-3x faster, no iframe overhead)
            creative_agent_client: Client for creative agent (required if
                fetch_previews=True)

        Returns:
            TaskResult containing GetProductsResponse with optional preview URLs in metadata

        Raises:
            ValueError: If fetch_previews=True but creative_agent_client is not provided
        """
        if fetch_previews and not creative_agent_client:
            raise ValueError("creative_agent_client is required when fetch_previews=True")

        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_products",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_products(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_products",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        result: TaskResult[GetProductsResponse] = self.adapter._parse_response(
            raw_result, GetProductsResponse
        )

        if fetch_previews and result.success and result.data and creative_agent_client:
            from adcp.utils.preview_cache import add_preview_urls_to_products

            products_with_previews = await add_preview_urls_to_products(
                result.data.products,
                creative_agent_client,
                use_batch=True,
                output_format=preview_output_format,
            )
            result.metadata = result.metadata or {}
            result.metadata["products_with_previews"] = products_with_previews

        return result

    async def list_creative_formats(
        self,
        request: ListCreativeFormatsRequest,
        fetch_previews: bool = False,
        preview_output_format: str = "url",
    ) -> TaskResult[ListCreativeFormatsResponse]:
        """
        List supported creative formats.

        Args:
            request: Request parameters
            fetch_previews: If True, generate preview URLs for each format using
                sample manifests (uses batch API for 5-10x performance improvement)
            preview_output_format: "url" for iframe URLs (default), "html" for direct
                embedding (2-3x faster, no iframe overhead)

        Returns:
            TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_creative_formats",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_creative_formats(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_creative_formats",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response(
            raw_result, ListCreativeFormatsResponse
        )

        if fetch_previews and result.success and result.data:
            from adcp.utils.preview_cache import add_preview_urls_to_formats

            formats_with_previews = await add_preview_urls_to_formats(
                result.data.formats,
                self,
                use_batch=True,
                output_format=preview_output_format,
            )
            result.metadata = result.metadata or {}
            result.metadata["formats_with_previews"] = formats_with_previews

        return result

    async def preview_creative(
        self,
        request: PreviewCreativeRequest,
    ) -> TaskResult[PreviewCreativeResponse]:
        """
        Generate preview of a creative manifest.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing PreviewCreativeResponse with preview URLs
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="preview_creative",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.preview_creative(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="preview_creative",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, PreviewCreativeResponse)

    async def sync_creatives(
        self,
        request: SyncCreativesRequest,
    ) -> TaskResult[SyncCreativesResponse]:
        """
        Sync Creatives.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncCreativesResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_creatives",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_creatives(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_creatives",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncCreativesResponse)

    async def list_creatives(
        self,
        request: ListCreativesRequest,
    ) -> TaskResult[ListCreativesResponse]:
        """
        List Creatives.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ListCreativesResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_creatives",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_creatives(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_creatives",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ListCreativesResponse)

    async def get_media_buy_delivery(
        self,
        request: GetMediaBuyDeliveryRequest,
    ) -> TaskResult[GetMediaBuyDeliveryResponse]:
        """
        Get Media Buy Delivery.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetMediaBuyDeliveryResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buy_delivery",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_media_buy_delivery(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buy_delivery",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse)

    async def get_media_buys(
        self,
        request: GetMediaBuysRequest,
    ) -> TaskResult[GetMediaBuysResponse]:
        """
        Get Media Buys.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetMediaBuysResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buys",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_media_buys(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buys",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetMediaBuysResponse)

    async def get_signals(
        self,
        request: GetSignalsRequest,
    ) -> TaskResult[GetSignalsResponse]:
        """
        Get Signals.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetSignalsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_signals",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_signals(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_signals",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetSignalsResponse)

    async def activate_signal(
        self,
        request: ActivateSignalRequest,
    ) -> TaskResult[ActivateSignalResponse]:
        """
        Activate Signal.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ActivateSignalResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="activate_signal",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.activate_signal(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="activate_signal",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ActivateSignalResponse)

    async def provide_performance_feedback(
        self,
        request: ProvidePerformanceFeedbackRequest,
    ) -> TaskResult[ProvidePerformanceFeedbackResponse]:
        """
        Provide Performance Feedback.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ProvidePerformanceFeedbackResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="provide_performance_feedback",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.provide_performance_feedback(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="provide_performance_feedback",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse)

    async def create_media_buy(
        self,
        request: CreateMediaBuyRequest,
    ) -> TaskResult[CreateMediaBuyResponse]:
        """
        Create a new media buy reservation.

        Requests the agent to reserve inventory for a campaign. The agent returns a
        media_buy_id that tracks this reservation and can be used for updates.

        Args:
            request: Media buy creation parameters including:
                - brand: Brand reference; resolved from brand.json or the registry at execution
                - packages: List of package requests specifying desired inventory
                - publisher_properties: Target properties for ad placement
                - budget: Optional budget constraints
                - start_date/end_date: Campaign flight dates

        Returns:
            TaskResult containing CreateMediaBuyResponse with:
                - media_buy_id: Unique identifier for this reservation
                - status: Current state of the media buy
                - packages: Confirmed package details
                - Additional platform-specific metadata

        Example:
            >>> from adcp import ADCPClient, CreateMediaBuyRequest, BrandReference
            >>> client = ADCPClient(agent_config)
            >>> request = CreateMediaBuyRequest(
            ...     brand=BrandReference(domain="acme.com"),
            ...     packages=[package_request],
            ...     publisher_properties=properties,
            ... )
            >>> result = await client.create_media_buy(request)
            >>> if result.success:
            ...     media_buy_id = result.data.media_buy_id
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_media_buy",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.create_media_buy(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_media_buy",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CreateMediaBuyResponse)

    async def update_media_buy(
        self,
        request: UpdateMediaBuyRequest,
    ) -> TaskResult[UpdateMediaBuyResponse]:
        """
        Update an existing media buy reservation.

        Modifies a previously created media buy by updating packages or publisher
        properties. The update operation uses discriminated unions to specify what
        to change - either package details or targeting properties.

        Args:
            request: Media buy update parameters including:
                - media_buy_id: Identifier from create_media_buy response
                - updates: Discriminated union specifying update type:
                    * UpdateMediaBuyPackagesRequest: Modify package selections
                    * UpdateMediaBuyPropertiesRequest: Change targeting properties

        Returns:
            TaskResult containing UpdateMediaBuyResponse with:
                - media_buy_id: The updated media buy identifier
                - status: Updated state of the media buy
                - packages: Updated package configurations
                - Additional platform-specific metadata

        Example:
            >>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest
            >>> client = ADCPClient(agent_config)
            >>> request = UpdateMediaBuyPackagesRequest(
            ...     media_buy_id="mb_123",
            ...     packages=[updated_package]
            ... )
            >>> result = await client.update_media_buy(request)
            >>> if result.success:
            ...     updated_packages = result.data.packages
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_media_buy",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.update_media_buy(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_media_buy",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, UpdateMediaBuyResponse)

    async def build_creative(
        self,
        request: BuildCreativeRequest,
    ) -> TaskResult[BuildCreativeResponse]:
        """
        Generate production-ready creative assets.

        Requests the creative agent to build final deliverable assets in the target
        format (e.g., VAST, DAAST, HTML5). This is typically called after previewing
        and approving a creative manifest.

        Args:
            request: Creative build parameters including:
                - manifest: Creative manifest with brand info and content
                - target_format_id: Desired output format identifier
                - inputs: Optional user-provided inputs for template variables
                - deployment: Platform or agent deployment configuration

        Returns:
            TaskResult containing BuildCreativeResponse with:
                - assets: Production-ready creative files (URLs or inline content)
                - format_id: The generated format identifier
                - manifest: The creative manifest used for generation
                - metadata: Additional platform-specific details

        Example:
            >>> from adcp import ADCPClient, BuildCreativeRequest
            >>> client = ADCPClient(agent_config)
            >>> request = BuildCreativeRequest(
            ...     manifest=creative_manifest,
            ...     target_format_id="vast_2.0",
            ...     inputs={"duration": 30}
            ... )
            >>> result = await client.build_creative(request)
            >>> if result.success:
            ...     vast_url = result.data.assets[0].url
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="build_creative",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.build_creative(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="build_creative",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, BuildCreativeResponse)

    async def list_accounts(
        self,
        request: ListAccountsRequest,
    ) -> TaskResult[ListAccountsResponse]:
        """
        List Accounts.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ListAccountsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_accounts",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_accounts(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_accounts",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ListAccountsResponse)

    async def sync_accounts(
        self,
        request: SyncAccountsRequest,
    ) -> TaskResult[SyncAccountsResponse]:
        """
        Sync Accounts.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncAccountsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_accounts",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_accounts(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_accounts",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncAccountsResponse)

    async def get_account_financials(
        self,
        request: GetAccountFinancialsRequest,
    ) -> TaskResult[GetAccountFinancialsResponse]:
        """
        Get Account Financials.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetAccountFinancialsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_account_financials",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_account_financials(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_account_financials",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetAccountFinancialsResponse)

    async def report_usage(
        self,
        request: ReportUsageRequest,
    ) -> TaskResult[ReportUsageResponse]:
        """
        Report Usage.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ReportUsageResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="report_usage",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.report_usage(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="report_usage",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ReportUsageResponse)

    async def log_event(
        self,
        request: LogEventRequest,
    ) -> TaskResult[LogEventResponse]:
        """
        Log Event.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing LogEventResponse
        """
        self._validate_task_features("log_event")
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="log_event",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.log_event(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="log_event",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, LogEventResponse)

    async def sync_event_sources(
        self,
        request: SyncEventSourcesRequest,
    ) -> TaskResult[SyncEventSourcesResponse]:
        """
        Sync Event Sources.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncEventSourcesResponse
        """
        self._validate_task_features("sync_event_sources")
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_event_sources",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_event_sources(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_event_sources",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncEventSourcesResponse)

    async def sync_audiences(
        self,
        request: SyncAudiencesRequest,
    ) -> TaskResult[SyncAudiencesResponse]:
        """
        Sync Audiences.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncAudiencesResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_audiences",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_audiences(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_audiences",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncAudiencesResponse)

    async def sync_catalogs(
        self,
        request: SyncCatalogsRequest,
    ) -> TaskResult[SyncCatalogsResponse]:
        """
        Sync Catalogs.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncCatalogsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_catalogs",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_catalogs(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_catalogs",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncCatalogsResponse)

    async def get_creative_delivery(
        self,
        request: GetCreativeDeliveryRequest,
    ) -> TaskResult[GetCreativeDeliveryResponse]:
        """
        Get Creative Delivery.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetCreativeDeliveryResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_creative_delivery",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_creative_delivery(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_creative_delivery",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetCreativeDeliveryResponse)

    # ========================================================================
    # V3 Protocol Methods - Protocol Discovery
    # ========================================================================

    async def get_adcp_capabilities(
        self,
        request: GetAdcpCapabilitiesRequest,
    ) -> TaskResult[GetAdcpCapabilitiesResponse]:
        """
        Get AdCP capabilities from the agent.

        Queries the agent's supported AdCP features, protocol versions, and
        domain-specific capabilities (media_buy, signals, sponsored_intelligence).

        Args:
            request: Request parameters including optional protocol filters

        Returns:
            TaskResult containing GetAdcpCapabilitiesResponse with:
                - adcp: Core protocol version information
                - supported_protocols: List of supported domain protocols
                - media_buy: Media buy capabilities (if supported)
                - sponsored_intelligence: SI capabilities (if supported)
                - signals: Signals capabilities (if supported)
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_adcp_capabilities",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_adcp_capabilities(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_adcp_capabilities",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetAdcpCapabilitiesResponse)

    # ========================================================================
    # V3 Protocol Methods - Content Standards
    # ========================================================================

    async def create_content_standards(
        self,
        request: CreateContentStandardsRequest,
    ) -> TaskResult[CreateContentStandardsResponse]:
        """
        Create a new content standards configuration.

        Defines acceptable content contexts for ad placement using natural
        language policy and optional calibration exemplars.

        Args:
            request: Request parameters including policy and scope

        Returns:
            TaskResult containing CreateContentStandardsResponse with standards_id
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_content_standards",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.create_content_standards(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_content_standards",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CreateContentStandardsResponse)

    async def get_content_standards(
        self,
        request: GetContentStandardsRequest,
    ) -> TaskResult[GetContentStandardsResponse]:
        """
        Get a content standards configuration by ID.

        Args:
            request: Request parameters including standards_id

        Returns:
            TaskResult containing GetContentStandardsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_content_standards",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_content_standards(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_content_standards",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetContentStandardsResponse)

    async def list_content_standards(
        self,
        request: ListContentStandardsRequest,
    ) -> TaskResult[ListContentStandardsResponse]:
        """
        List content standards configurations.

        Args:
            request: Request parameters including optional filters

        Returns:
            TaskResult containing ListContentStandardsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_content_standards",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_content_standards(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_content_standards",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ListContentStandardsResponse)

    async def update_content_standards(
        self,
        request: UpdateContentStandardsRequest,
    ) -> TaskResult[UpdateContentStandardsResponse]:
        """
        Update a content standards configuration.

        Args:
            request: Request parameters including standards_id and updates

        Returns:
            TaskResult containing UpdateContentStandardsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_content_standards",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.update_content_standards(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_content_standards",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, UpdateContentStandardsResponse)

    async def calibrate_content(
        self,
        request: CalibrateContentRequest,
    ) -> TaskResult[CalibrateContentResponse]:
        """
        Calibrate content against standards.

        Evaluates content (artifact or URL) against configured standards to
        determine suitability for ad placement.

        Args:
            request: Request parameters including content to evaluate

        Returns:
            TaskResult containing CalibrateContentResponse with verdict
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="calibrate_content",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.calibrate_content(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="calibrate_content",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CalibrateContentResponse)

    async def validate_content_delivery(
        self,
        request: ValidateContentDeliveryRequest,
    ) -> TaskResult[ValidateContentDeliveryResponse]:
        """
        Validate content delivery against standards.

        Validates that ad delivery records comply with content standards.

        Args:
            request: Request parameters including delivery records

        Returns:
            TaskResult containing ValidateContentDeliveryResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="validate_content_delivery",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.validate_content_delivery(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="validate_content_delivery",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ValidateContentDeliveryResponse)

    async def get_media_buy_artifacts(
        self,
        request: GetMediaBuyArtifactsRequest,
    ) -> TaskResult[GetMediaBuyArtifactsResponse]:
        """
        Get artifacts associated with a media buy.

        Retrieves content artifacts where ads were delivered for a media buy.

        Args:
            request: Request parameters including media_buy_id

        Returns:
            TaskResult containing GetMediaBuyArtifactsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buy_artifacts",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_media_buy_artifacts(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buy_artifacts",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetMediaBuyArtifactsResponse)

    # ========================================================================
    # V3 Protocol Methods - Sponsored Intelligence
    # ========================================================================

    async def si_get_offering(
        self,
        request: SiGetOfferingRequest,
    ) -> TaskResult[SiGetOfferingResponse]:
        """
        Get sponsored intelligence offering.

        Retrieves product/service offerings that can be presented in a
        sponsored intelligence session.

        Args:
            request: Request parameters including brand context

        Returns:
            TaskResult containing SiGetOfferingResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_get_offering",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.si_get_offering(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_get_offering",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SiGetOfferingResponse)

    async def si_initiate_session(
        self,
        request: SiInitiateSessionRequest,
    ) -> TaskResult[SiInitiateSessionResponse]:
        """
        Initiate a sponsored intelligence session.

        Starts a conversational brand experience session with a user.

        Args:
            request: Request parameters including identity and context

        Returns:
            TaskResult containing SiInitiateSessionResponse with session_id
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_initiate_session",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.si_initiate_session(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_initiate_session",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SiInitiateSessionResponse)

    async def si_send_message(
        self,
        request: SiSendMessageRequest,
    ) -> TaskResult[SiSendMessageResponse]:
        """
        Send a message in a sponsored intelligence session.

        Continues the conversation in an active SI session.

        Args:
            request: Request parameters including session_id and message

        Returns:
            TaskResult containing SiSendMessageResponse with brand response
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_send_message",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.si_send_message(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_send_message",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SiSendMessageResponse)

    async def si_terminate_session(
        self,
        request: SiTerminateSessionRequest,
    ) -> TaskResult[SiTerminateSessionResponse]:
        """
        Terminate a sponsored intelligence session.

        Ends an active SI session, optionally with follow-up actions.

        Args:
            request: Request parameters including session_id and termination context

        Returns:
            TaskResult containing SiTerminateSessionResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_terminate_session",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.si_terminate_session(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_terminate_session",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SiTerminateSessionResponse)

    # ========================================================================
    # V3 Governance Methods
    # ========================================================================

    async def get_creative_features(
        self,
        request: GetCreativeFeaturesRequest,
    ) -> TaskResult[GetCreativeFeaturesResponse]:
        """Evaluate governance features for a creative manifest."""
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_creative_features",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_creative_features(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_creative_features",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetCreativeFeaturesResponse)

    async def sync_plans(
        self,
        request: SyncPlansRequest,
    ) -> TaskResult[SyncPlansResponse]:
        """Sync campaign governance plans to the governance agent."""
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_plans",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_plans(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_plans",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncPlansResponse)

    async def check_governance(
        self,
        request: CheckGovernanceRequest,
    ) -> TaskResult[CheckGovernanceResponse]:
        """Check a proposed or committed action against campaign governance."""
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="check_governance",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.check_governance(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="check_governance",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CheckGovernanceResponse)

    async def report_plan_outcome(
        self,
        request: ReportPlanOutcomeRequest,
    ) -> TaskResult[ReportPlanOutcomeResponse]:
        """Report the outcome of a governed action to the governance agent."""
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="report_plan_outcome",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.report_plan_outcome(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="report_plan_outcome",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ReportPlanOutcomeResponse)

    async def get_plan_audit_logs(
        self,
        request: GetPlanAuditLogsRequest,
    ) -> TaskResult[GetPlanAuditLogsResponse]:
        """Retrieve governance state and audit logs for one or more plans."""
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_plan_audit_logs",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_plan_audit_logs(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_plan_audit_logs",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetPlanAuditLogsResponse)

    async def create_property_list(
        self,
        request: CreatePropertyListRequest,
    ) -> TaskResult[CreatePropertyListResponse]:
        """
        Create a property list for governance filtering.

        Property lists define dynamic sets of properties based on filters,
        brand manifests, and feature requirements.

        Args:
            request: Request parameters for creating the property list

        Returns:
            TaskResult containing CreatePropertyListResponse with list_id
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_property_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.create_property_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_property_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CreatePropertyListResponse)

    async def get_property_list(
        self,
        request: GetPropertyListRequest,
    ) -> TaskResult[GetPropertyListResponse]:
        """
        Get a property list with optional resolution.

        When resolve=true, returns the list of resolved property identifiers.
        Use this to get the actual properties that match the list's filters.

        Args:
            request: Request parameters including list_id and resolve flag

        Returns:
            TaskResult containing GetPropertyListResponse with identifiers
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_property_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_property_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_property_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetPropertyListResponse)

    async def list_property_lists(
        self,
        request: ListPropertyListsRequest,
    ) -> TaskResult[ListPropertyListsResponse]:
        """
        List property lists owned by a principal.

        Retrieves metadata for all property lists, optionally filtered
        by principal or pagination parameters.

        Args:
            request: Request parameters with optional filtering

        Returns:
            TaskResult containing ListPropertyListsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_property_lists",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_property_lists(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_property_lists",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ListPropertyListsResponse)

    async def update_property_list(
        self,
        request: UpdatePropertyListRequest,
    ) -> TaskResult[UpdatePropertyListResponse]:
        """
        Update a property list.

        Modifies the filters, brand manifest, or other parameters
        of an existing property list.

        Args:
            request: Request parameters with list_id and updates

        Returns:
            TaskResult containing UpdatePropertyListResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_property_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.update_property_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_property_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, UpdatePropertyListResponse)

    async def delete_property_list(
        self,
        request: DeletePropertyListRequest,
    ) -> TaskResult[DeletePropertyListResponse]:
        """
        Delete a property list.

        Removes a property list. Any active subscriptions to this list
        will be terminated.

        Args:
            request: Request parameters with list_id

        Returns:
            TaskResult containing DeletePropertyListResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="delete_property_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.delete_property_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="delete_property_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, DeletePropertyListResponse)

    # ========================================================================
    # V3 Protocol Methods - Governance (Collection Lists)
    # ========================================================================

    async def create_collection_list(
        self,
        request: CreateCollectionListRequest,
    ) -> TaskResult[CreateCollectionListResponse]:
        """Create a collection list for governance filtering.

        Collection lists define dynamic sets of collections (properties, segments, etc.)
        that can be referenced by authorization rules and audience scoping.

        Args:
            request: Request parameters for creating the collection list

        Returns:
            TaskResult containing CreateCollectionListResponse with list_id
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_collection_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.create_collection_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_collection_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CreateCollectionListResponse)

    async def get_collection_list(
        self,
        request: GetCollectionListRequest,
    ) -> TaskResult[GetCollectionListResponse]:
        """Get a collection list with optional resolution.

        When resolve=true, returns the resolved members of the collection list.

        Args:
            request: Request parameters including list_id and resolve flag

        Returns:
            TaskResult containing GetCollectionListResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_collection_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_collection_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_collection_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetCollectionListResponse)

    async def list_collection_lists(
        self,
        request: ListCollectionListsRequest,
    ) -> TaskResult[ListCollectionListsResponse]:
        """List collection lists owned by a principal.

        Args:
            request: Request parameters with optional filtering

        Returns:
            TaskResult containing ListCollectionListsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_collection_lists",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_collection_lists(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_collection_lists",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ListCollectionListsResponse)

    async def update_collection_list(
        self,
        request: UpdateCollectionListRequest,
    ) -> TaskResult[UpdateCollectionListResponse]:
        """Update a collection list.

        Args:
            request: Request parameters with list_id and updates

        Returns:
            TaskResult containing UpdateCollectionListResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_collection_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.update_collection_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_collection_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, UpdateCollectionListResponse)

    async def delete_collection_list(
        self,
        request: DeleteCollectionListRequest,
    ) -> TaskResult[DeleteCollectionListResponse]:
        """Delete a collection list.

        Args:
            request: Request parameters with list_id

        Returns:
            TaskResult containing DeleteCollectionListResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="delete_collection_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.delete_collection_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="delete_collection_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, DeleteCollectionListResponse)

    # ========================================================================
    # V3 Protocol Methods - Governance (Sync Governance)
    # ========================================================================

    async def sync_governance(
        self,
        request: SyncGovernanceRequest,
    ) -> TaskResult[SyncGovernanceResponse]:
        """Sync governance agents attached to an account.

        Attach, detach, or replace the set of governance agents that must be
        consulted for plan approval on an account.

        Args:
            request: Request parameters with account and governance agents

        Returns:
            TaskResult containing SyncGovernanceResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_governance",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_governance(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_governance",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncGovernanceResponse)

    # ========================================================================
    # V3 Protocol Methods - Temporal Matching Protocol (TMP)
    # ========================================================================

    async def context_match(
        self,
        request: ContextMatchRequest,
    ) -> TaskResult[ContextMatchResponse]:
        """Match ad context to buyer packages.

        Evaluates contextual signals for a publisher placement against the
        buyer's active packages and returns matching offers.

        Args:
            request: Context match request with placement, property, and
                optional artifact refs, context signals, and geo data.

        Returns:
            TaskResult containing ContextMatchResponse with offers.
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True, by_alias=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="context_match",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.context_match(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="context_match",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ContextMatchResponse)

    async def identity_match(
        self,
        request: IdentityMatchRequest,
    ) -> TaskResult[IdentityMatchResponse]:
        """Match user identity for package eligibility.

        Evaluates a user identity token against all active packages for
        frequency capping and personalization.

        Args:
            request: Identity match request with user_token, uid_type,
                and package_ids.

        Returns:
            TaskResult containing IdentityMatchResponse with eligible_package_ids.
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True, by_alias=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="identity_match",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.identity_match(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="identity_match",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, IdentityMatchResponse)

    # ========================================================================
    # V3 Protocol Methods - Brand Rights
    # ========================================================================

    async def get_brand_identity(
        self,
        request: GetBrandIdentityRequest,
    ) -> TaskResult[GetBrandIdentityResponse]:
        """Get brand identity information.

        Retrieves brand identity data including logos, colors, fonts,
        voice synthesis config, and rights availability.

        Args:
            request: Request with brand_id and optional fields filter.

        Returns:
            TaskResult containing GetBrandIdentityResponse.
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_brand_identity",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_brand_identity(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_brand_identity",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetBrandIdentityResponse)

    async def get_rights(
        self,
        request: GetRightsRequest,
    ) -> TaskResult[GetRightsResponse]:
        """Get available rights for licensing.

        Searches for rights offerings using natural language query and
        filters by type, uses, countries, and buyer compatibility.

        Args:
            request: Request with query, uses, and optional filters.

        Returns:
            TaskResult containing GetRightsResponse with matched rights.
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_rights",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_rights(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_rights",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetRightsResponse)

    async def acquire_rights(
        self,
        request: AcquireRightsRequest,
    ) -> TaskResult[AcquireRightsResponse]:
        """Acquire rights for brand content usage.

        Binding contractual request to license rights for a campaign.
        Returns credentials for generating rights-cleared content.

        Args:
            request: Request with rights_id, pricing_option_id, buyer,
                campaign, and revocation_webhook.

        Returns:
            TaskResult containing AcquireRightsResponse (acquired,
            pending_approval, rejected, or error).
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="acquire_rights",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.acquire_rights(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="acquire_rights",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, AcquireRightsResponse)

    async def update_rights(
        self,
        request: UpdateRightsRequest,
    ) -> TaskResult[UpdateRightsResponse]:
        """Update terms of an existing rights acquisition.

        Modifies a previously acquired rights record — typically to extend
        the ``end_date``, raise the ``impression_cap``, pause/unpause via
        ``paused``, or swap to a compatible ``pricing_option_id``. Partial
        update: pass only the fields you want to change.

        Failure modes (surface as ``TaskResult`` with ``success=False``):

        * Acquisition is expired or revoked — the seller rejects the update
          outright; mint a fresh ``acquire_rights`` instead.
        * ``pricing_option_id`` swap to an incompatible option — rejected;
          the new option's terms must be a strict superset / compatible
          with the original acquisition.
        * No partial-state mutations on rejection: the acquisition remains
          at its prior state when any field fails validation.

        Args:
            request: Request with ``rights_id`` and at least one mutable
                field (``end_date``, ``impression_cap``, ``paused``, or
                ``pricing_option_id``).

        Returns:
            TaskResult containing UpdateRightsResponse (updated or error).
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_rights",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.update_rights(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_rights",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, UpdateRightsResponse)

    # ========================================================================
    # V3 Protocol Methods - Compliance
    # ========================================================================

    async def comply_test_controller(
        self,
        request: ComplyTestControllerRequest,
    ) -> TaskResult[ComplyTestControllerResponse]:
        """Compliance test controller for sandbox testing.

        Enables sellers to simulate state transitions and delivery data
        in a sandbox environment for compliance testing.

        Args:
            request: Request specifying scenario and parameters.

        Returns:
            TaskResult containing ComplyTestControllerResponse.
        """
        operation_id = create_operation_id()
        params = request.model_dump(mode="json", exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="comply_test_controller",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.comply_test_controller(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="comply_test_controller",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ComplyTestControllerResponse)

    async def list_tools(self) -> list[str]:
        """
        List available tools from the agent.

        Returns:
            List of tool names
        """
        return await self.adapter.list_tools()

    async def get_info(self) -> dict[str, Any]:
        """
        Get agent information including AdCP extension metadata.

        Returns agent card information including:
        - Agent name, description, version
        - Protocol type (mcp or a2a)
        - AdCP version (from extensions.adcp.adcp_version)
        - Supported protocols (from extensions.adcp.protocols_supported)
        - Available tools/skills

        Returns:
            Dictionary with agent metadata
        """
        return await self.adapter.get_agent_info()

    async def close(self) -> None:
        """Close the adapter and clean up resources."""
        if hasattr(self.adapter, "close"):
            logger.debug(f"Closing adapter for agent {self.agent_config.id}")
            await self.adapter.close()

    async def __aenter__(self) -> ADCPClient:
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        await self.close()

    def _verify_webhook_signature(
        self,
        payload: dict[str, Any],
        signature: str,
        timestamp: str,
        raw_body: bytes | str | None = None,
    ) -> bool:
        """
        Verify HMAC-SHA256 signature of webhook payload.

        The verification algorithm matches get_adcp_signed_headers_for_webhook:
        1. Constructs message as "{timestamp}.{raw_http_body_bytes}"
        2. HMAC-SHA256 signs with the shared secret
        3. Compares against the provided signature (with "sha256=" prefix stripped)
           using constant-time comparison.

        Per AdCP spec (adcontextprotocol/adcp#2478): verifiers MUST use the raw
        HTTP body bytes captured before any JSON parse; they SHOULD NOT
        re-serialize a parsed payload to reconstruct the signed bytes, because
        re-serialization silently fails against signers whose output differs in
        separator choice, key order, unicode escapes, or number formatting —
        masking signer bugs the verifier should surface. Callers that genuinely
        cannot capture raw bytes MUST fail closed.

        This implementation therefore rejects verification attempts that don't
        supply ``raw_body``. Capture it from your framework's pre-parse hook
        (FastAPI ``Request.body()``, Flask ``request.get_data(cache=True)``,
        aiohttp ``Request.read()``, Express ``express.raw()``).

        Args:
            payload: Parsed webhook payload dict (not used for signing; kept
                for signature parity with callers, but verification derives
                solely from ``raw_body``).
            signature: Signature to verify (with or without "sha256=" prefix)
            timestamp: Unix timestamp in seconds from X-AdCP-Timestamp header
            raw_body: Raw HTTP request body bytes as received on the wire,
                captured before any JSON parse. Required.

        Returns:
            True if signature is valid, False otherwise (including when
            ``raw_body`` is missing — fails closed per spec).
        """
        if not self.webhook_secret:
            logger.warning("Webhook signature verification skipped: no webhook_secret configured")
            return True

        # Fail closed per adcontextprotocol/adcp#2478: verifiers that cannot
        # capture raw bytes MUST reject, surfacing the infrastructure gap
        # rather than silently reconstructing a signed body that may diverge
        # from the bytes the signer actually hashed.
        if raw_body is None:
            logger.error(
                "Webhook signature verification failed: raw_body is required. "
                "Capture the raw HTTP body pre-parse and pass it to "
                "handle_webhook(raw_body=...). See "
                "https://adcontextprotocol.org/docs/building/implementation/security"
                "#legacy-hmac-sha256-fallback-deprecated-removed-in-40"
            )
            return False

        # Reject stale or future timestamps to prevent replay attacks
        try:
            ts = int(timestamp)
        except (ValueError, TypeError):
            return False
        now = int(time.time())
        if abs(now - ts) > self.webhook_timestamp_tolerance:
            return False

        # Strip "sha256=" prefix if present
        if signature.startswith("sha256="):
            signature = signature[7:]

        payload_str = raw_body.decode("utf-8") if isinstance(raw_body, bytes) else raw_body

        # Construct signed message: timestamp.payload
        signed_message = f"{timestamp}.{payload_str}"

        # Generate expected signature
        expected_signature = hmac.new(
            self.webhook_secret.encode("utf-8"), signed_message.encode("utf-8"), hashlib.sha256
        ).hexdigest()

        return hmac.compare_digest(signature, expected_signature)

    def _parse_webhook_result(
        self,
        task_id: str,
        task_type: str,
        operation_id: str,
        status: GeneratedTaskStatus,
        result: Any,
        timestamp: datetime | str,
        message: str | None,
        context_id: str | None,
    ) -> TaskResult[AdcpAsyncResponseData]:
        """
        Parse webhook data into typed TaskResult based on task_type.

        Args:
            task_id: Unique identifier for this task
            task_type: Task type from application routing (e.g., "get_products")
            operation_id: Operation identifier from application routing
            status: Current task status
            result: Task-specific payload (AdCP response data)
            timestamp: ISO 8601 timestamp when webhook was generated
            message: Human-readable summary of task state
            context_id: Session/conversation identifier

        Returns:
            TaskResult with task-specific typed response data

        Note:
            This method works with both MCP and A2A protocols by accepting
            protocol-agnostic parameters rather than protocol-specific objects.
        """
        from adcp.utils.response_parser import parse_json_or_text

        # Map task types to their response types (using string literals, not enum)
        # Note: Some response types are Union types (e.g., ActivateSignalResponse = Success | Error)
        response_type_map: dict[str, type[BaseModel] | Any] = {
            # Core operations
            "get_products": GetProductsResponse,
            "list_creative_formats": ListCreativeFormatsResponse,
            "sync_creatives": SyncCreativesResponse,
            "list_creatives": ListCreativesResponse,
            "build_creative": BuildCreativeResponse,
            "preview_creative": PreviewCreativeResponse,
            "create_media_buy": CreateMediaBuyResponse,
            "update_media_buy": UpdateMediaBuyResponse,
            "get_media_buy_delivery": GetMediaBuyDeliveryResponse,
            "get_media_buys": GetMediaBuysResponse,
            "get_signals": GetSignalsResponse,
            "activate_signal": ActivateSignalResponse,
            "provide_performance_feedback": ProvidePerformanceFeedbackResponse,
            "report_usage": ReportUsageResponse,
            "get_account_financials": GetAccountFinancialsResponse,
            "list_accounts": ListAccountsResponse,
            "sync_accounts": SyncAccountsResponse,
            "log_event": LogEventResponse,
            "sync_event_sources": SyncEventSourcesResponse,
            "sync_audiences": SyncAudiencesResponse,
            "sync_catalogs": SyncCatalogsResponse,
            "get_creative_delivery": GetCreativeDeliveryResponse,
            # V3 Protocol Discovery
            "get_adcp_capabilities": GetAdcpCapabilitiesResponse,
            # V3 Content Standards
            "create_content_standards": CreateContentStandardsResponse,
            "get_content_standards": GetContentStandardsResponse,
            "list_content_standards": ListContentStandardsResponse,
            "update_content_standards": UpdateContentStandardsResponse,
            "calibrate_content": CalibrateContentResponse,
            "validate_content_delivery": ValidateContentDeliveryResponse,
            "get_media_buy_artifacts": GetMediaBuyArtifactsResponse,
            # V3 Sponsored Intelligence
            "si_get_offering": SiGetOfferingResponse,
            "si_initiate_session": SiInitiateSessionResponse,
            "si_send_message": SiSendMessageResponse,
            "si_terminate_session": SiTerminateSessionResponse,
            # V3 Governance
            "get_creative_features": GetCreativeFeaturesResponse,
            "sync_plans": SyncPlansResponse,
            "check_governance": CheckGovernanceResponse,
            "report_plan_outcome": ReportPlanOutcomeResponse,
            "get_plan_audit_logs": GetPlanAuditLogsResponse,
            "create_property_list": CreatePropertyListResponse,
            "get_property_list": GetPropertyListResponse,
            "list_property_lists": ListPropertyListsResponse,
            "update_property_list": UpdatePropertyListResponse,
            "delete_property_list": DeletePropertyListResponse,
            # TMP
            "context_match": ContextMatchResponse,
            "identity_match": IdentityMatchResponse,
            # Brand Rights
            "get_brand_identity": GetBrandIdentityResponse,
            "get_rights": GetRightsResponse,
            "acquire_rights": AcquireRightsResponse,
            "update_rights": UpdateRightsResponse,
            # Compliance
            "comply_test_controller": ComplyTestControllerResponse,
        }

        # Handle completed tasks with result parsing
        if status == GeneratedTaskStatus.completed and result is not None:
            response_type = response_type_map.get(task_type)
            if response_type:
                try:
                    parsed_result: Any = parse_json_or_text(result, response_type)
                    return TaskResult[AdcpAsyncResponseData](
                        status=TaskStatus.COMPLETED,
                        data=parsed_result,
                        success=True,
                        metadata={
                            "task_id": task_id,
                            "operation_id": operation_id,
                            "timestamp": timestamp,
                            "message": message,
                        },
                    )
                except ValueError as e:
                    logger.warning(f"Failed to parse webhook result: {e}")
                    # Fall through to untyped result

        # Handle failed, input-required, or unparseable results
        # Convert status to core TaskStatus enum
        status_map = {
            GeneratedTaskStatus.completed: TaskStatus.COMPLETED,
            GeneratedTaskStatus.submitted: TaskStatus.SUBMITTED,
            GeneratedTaskStatus.working: TaskStatus.WORKING,
            GeneratedTaskStatus.failed: TaskStatus.FAILED,
            GeneratedTaskStatus.input_required: TaskStatus.NEEDS_INPUT,
        }
        task_status = status_map.get(status, TaskStatus.FAILED)

        # Extract error message from result.errors if present
        error_message: str | None = None
        if result is not None and hasattr(result, "errors"):
            errors = getattr(result, "errors", None)
            if errors and len(errors) > 0:
                first_error = errors[0]
                if hasattr(first_error, "message"):
                    error_message = first_error.message

        return TaskResult[AdcpAsyncResponseData](
            status=task_status,
            data=result,
            success=status == GeneratedTaskStatus.completed,
            error=error_message,
            metadata={
                "task_id": task_id,
                "operation_id": operation_id,
                "timestamp": timestamp,
                "message": message,
                "context_id": context_id,
            },
        )

    async def _handle_mcp_webhook(
        self,
        payload: dict[str, Any],
        task_type: str,
        operation_id: str,
        signature: str | None,
        timestamp: str | None = None,
        raw_body: bytes | str | None = None,
    ) -> TaskResult[AdcpAsyncResponseData]:
        """
        Handle MCP webhook delivered via HTTP POST.

        Args:
            payload: Webhook payload dict
            task_type: Task type from application routing
            operation_id: Operation identifier from application routing
            signature: Optional HMAC-SHA256 signature for verification (X-AdCP-Signature header)
            timestamp: Optional Unix timestamp for signature verification (X-AdCP-Timestamp header)
            raw_body: Optional raw HTTP request body for signature verification

        Returns:
            TaskResult with parsed task-specific response data

        Raises:
            ADCPWebhookSignatureError: If signature verification fails
            ValidationError: If payload doesn't match McpWebhookPayload schema
        """
        from adcp.types.generated_poc.core.mcp_webhook_payload import McpWebhookPayload

        # When a webhook_secret is configured, require signed webhooks
        if self.webhook_secret:
            if not signature or not timestamp:
                raise ADCPWebhookSignatureError(
                    "Webhook signature and timestamp headers are required"
                )
            if not self._verify_webhook_signature(payload, signature, timestamp, raw_body):
                logger.warning(
                    f"Webhook signature verification failed for agent {self.agent_config.id}"
                )
                raise ADCPWebhookSignatureError("Invalid webhook signature")

        # Validate and parse MCP webhook payload
        webhook = McpWebhookPayload.model_validate(payload)

        # Emit activity for monitoring
        self._emit_activity(
            Activity(
                type=ActivityType.WEBHOOK_RECEIVED,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type=task_type,
                timestamp=datetime.now(timezone.utc).isoformat(),
                metadata={"payload": payload, "protocol": "mcp"},
            )
        )

        # Extract fields and parse result
        return self._parse_webhook_result(
            task_id=webhook.task_id,
            task_type=task_type,
            operation_id=operation_id,
            status=webhook.status,
            result=webhook.result,
            timestamp=webhook.timestamp,
            message=webhook.message,
            context_id=webhook.context_id,
        )

    async def _handle_a2a_webhook(
        self, payload: Task | TaskStatusUpdateEvent, task_type: str, operation_id: str
    ) -> TaskResult[AdcpAsyncResponseData]:
        """
        Handle A2A webhook delivered through Task or TaskStatusUpdateEvent.

        Per A2A specification:
        - Terminated statuses (completed, failed): Payload is Task with artifacts[].parts[]
        - Intermediate statuses (working, input-required, submitted):
        Payload is TaskStatusUpdateEvent with status.message.parts[]

        Args:
            payload: A2A Task or TaskStatusUpdateEvent object
            task_type: Task type from application routing
            operation_id: Operation identifier from application routing

        Returns:
            TaskResult with parsed task-specific response data

        Note:
            Signature verification is NOT applicable for A2A webhooks
            as they arrive through authenticated A2A connections, not HTTP.
        """
        from a2a.types import DataPart, TextPart

        adcp_data: Any = None
        text_message: str | None = None
        task_id: str
        context_id: str | None
        status_state: str
        timestamp: datetime | str

        # Type detection and extraction based on payload type
        if isinstance(payload, TaskStatusUpdateEvent):
            # Intermediate status: Extract from status.message.parts[]
            task_id = payload.task_id
            context_id = payload.context_id
            status_state = payload.status.state if payload.status else "failed"
            timestamp = (
                payload.status.timestamp
                if payload.status and payload.status.timestamp
                else datetime.now(timezone.utc)
            )

            # Extract from status.message.parts[]
            if payload.status and payload.status.message and payload.status.message.parts:
                # Extract DataPart for structured AdCP payload
                data_parts = [
                    p.root for p in payload.status.message.parts if isinstance(p.root, DataPart)
                ]
                if data_parts:
                    # Use last DataPart as authoritative
                    last_data_part = data_parts[-1]
                    adcp_data = last_data_part.data

                    # Unwrap {"response": {...}} wrapper if present (ADK pattern)
                    if isinstance(adcp_data, dict) and "response" in adcp_data:
                        adcp_data = adcp_data["response"]

                # Extract TextPart for human-readable message
                for part in payload.status.message.parts:
                    if isinstance(part.root, TextPart):
                        text_message = part.root.text
                        break

        else:
            # Terminated status (Task): Extract from artifacts[].parts[]
            task_id = payload.id
            context_id = payload.context_id
            status_state = payload.status.state if payload.status else "failed"
            timestamp = (
                payload.status.timestamp
                if payload.status and payload.status.timestamp
                else datetime.now(timezone.utc)
            )

            # Extract from task.artifacts[].parts[]
            # Following A2A spec: use last artifact, last DataPart is authoritative
            if payload.artifacts:
                # Use last artifact (most recent in streaming scenarios)
                target_artifact = payload.artifacts[-1]

                if target_artifact.parts:
                    # Extract DataPart for structured AdCP payload
                    data_parts = [
                        p.root for p in target_artifact.parts if isinstance(p.root, DataPart)
                    ]
                    if data_parts:
                        # Use last DataPart as authoritative
                        last_data_part = data_parts[-1]
                        adcp_data = last_data_part.data

                        # Unwrap {"response": {...}} wrapper if present (ADK pattern)
                        if isinstance(adcp_data, dict) and "response" in adcp_data:
                            adcp_data = adcp_data["response"]

                    # Extract TextPart for human-readable message
                    for part in target_artifact.parts:
                        if isinstance(part.root, TextPart):
                            text_message = part.root.text
                            break

        # Map A2A status.state to GeneratedTaskStatus enum
        status_map = {
            "completed": GeneratedTaskStatus.completed,
            "submitted": GeneratedTaskStatus.submitted,
            "working": GeneratedTaskStatus.working,
            "failed": GeneratedTaskStatus.failed,
            "input-required": GeneratedTaskStatus.input_required,
            "input_required": GeneratedTaskStatus.input_required,  # Handle both formats
        }
        mapped_status = status_map.get(status_state, GeneratedTaskStatus.failed)

        # Emit activity for monitoring
        self._emit_activity(
            Activity(
                type=ActivityType.WEBHOOK_RECEIVED,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type=task_type,
                timestamp=datetime.now(timezone.utc).isoformat(),
                metadata={
                    "task_id": task_id,
                    "protocol": "a2a",
                    "payload_type": (
                        "TaskStatusUpdateEvent"
                        if isinstance(payload, TaskStatusUpdateEvent)
                        else "Task"
                    ),
                },
            )
        )

        # Parse and return typed result by passing extracted fields directly
        return self._parse_webhook_result(
            task_id=task_id,
            task_type=task_type,
            operation_id=operation_id,
            status=mapped_status,
            result=adcp_data,
            timestamp=timestamp,
            message=text_message,
            context_id=context_id,
        )

    async def handle_webhook(
        self,
        payload: dict[str, Any] | Task | TaskStatusUpdateEvent,
        task_type: str,
        operation_id: str,
        signature: str | None = None,
        timestamp: str | None = None,
        raw_body: bytes | str | None = None,
    ) -> TaskResult[AdcpAsyncResponseData]:
        """
        Handle incoming webhook and return typed result.

        This method provides a unified interface for handling webhooks from both
        MCP and A2A protocols:

        - MCP Webhooks: HTTP POST with dict payload, optional HMAC signature
        - A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status

        The method automatically detects the protocol type and routes to the
        appropriate handler. Both protocols return a consistent TaskResult
        structure with typed AdCP response data.

        Args:
            payload: Webhook payload - one of:
                - dict[str, Any]: MCP webhook payload from HTTP POST
                - Task: A2A webhook for terminated statuses (completed, failed)
                - TaskStatusUpdateEvent: A2A webhook for intermediate statuses
                  (working, input-required, submitted)
            task_type: Task type from application routing (e.g., "get_products").
                Applications should extract this from URL routing pattern:
                /webhook/{task_type}/{agent_id}/{operation_id}
            operation_id: Operation identifier from application routing.
                Used to correlate webhook notifications with original task submission.
            signature: Optional HMAC-SHA256 signature for MCP webhook verification
                (X-AdCP-Signature header). Ignored for A2A webhooks.
            timestamp: Optional Unix timestamp (seconds) for MCP webhook signature
                verification (X-AdCP-Timestamp header). Required when signature is provided.
            raw_body: Optional raw HTTP request body bytes for signature verification.
                When provided, used directly instead of re-serializing the payload,
                avoiding cross-language JSON serialization mismatches. Strongly
                recommended for production use.

        Returns:
            TaskResult with parsed task-specific response data. The structure
            is identical regardless of protocol.

        Raises:
            ADCPWebhookSignatureError: If MCP signature verification fails
            ValidationError: If MCP payload doesn't match WebhookPayload schema

        Note:
            task_type and operation_id were deprecated from the webhook payload
            per AdCP specification. Applications must extract these from URL
            routing and pass them explicitly.

        Examples:
            MCP webhook (HTTP endpoint):
            >>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}")
            >>> async def webhook_handler(task_type: str, operation_id: str, request: Request):
            >>>     raw_body = await request.body()
            >>>     payload = json.loads(raw_body)
            >>>     signature = request.headers.get("X-AdCP-Signature")
            >>>     timestamp = request.headers.get("X-AdCP-Timestamp")
            >>>     result = await client.handle_webhook(
            >>>         payload, task_type, operation_id, signature, timestamp,
            >>>         raw_body=raw_body,
            >>>     )
            >>>     if result.success:
            >>>         print(f"Task completed: {result.data}")

            A2A webhook with Task (terminated status):
            >>> async def on_task_completed(task: Task):
            >>>     # Extract task_type and operation_id from your app's task tracking
            >>>     task_type = your_task_registry.get_type(task.id)
            >>>     operation_id = your_task_registry.get_operation_id(task.id)
            >>>     result = await client.handle_webhook(
            >>>         task, task_type, operation_id
            >>>     )
            >>>     if result.success:
            >>>         print(f"Task completed: {result.data}")

            A2A webhook with TaskStatusUpdateEvent (intermediate status):
            >>> async def on_task_update(event: TaskStatusUpdateEvent):
            >>>     # Extract task_type and operation_id from your app's task tracking
            >>>     task_type = your_task_registry.get_type(event.task_id)
            >>>     operation_id = your_task_registry.get_operation_id(event.task_id)
            >>>     result = await client.handle_webhook(
            >>>         event, task_type, operation_id
            >>>     )
            >>>     if result.status == GeneratedTaskStatus.working:
            >>>         print(f"Task still working: {result.metadata.get('message')}")
        """
        # Detect protocol type and route to appropriate handler
        if isinstance(payload, (Task, TaskStatusUpdateEvent)):
            # A2A webhook (Task or TaskStatusUpdateEvent)
            return await self._handle_a2a_webhook(payload, task_type, operation_id)
        else:
            # MCP webhook (dict payload)
            return await self._handle_mcp_webhook(
                payload, task_type, operation_id, signature, timestamp, raw_body
            )

Client for interacting with a single AdCP agent.

Initialize ADCP client for a single agent.

Args

agent_config
Agent configuration
webhook_url_template
Template for webhook URLs with {agent_id}, {task_type}, {operation_id}
webhook_secret
Secret for webhook signature verification
on_activity
Callback for activity events
webhook_timestamp_tolerance
Maximum age (in seconds) for webhook timestamps. Webhooks with timestamps older than this or more than this far in the future are rejected. Defaults to 300 (5 minutes).
capabilities_ttl
Time-to-live in seconds for cached capabilities (default: 1 hour)
validate_features
When True, automatically check that the seller supports required features before making task calls (e.g., sync_audiences requires audience_targeting). Requires capabilities to have been fetched first.
strict_idempotency
When True, verify the seller declared adcp.idempotency.replay_ttl_seconds in capabilities before any mutating call. Fetches capabilities lazily on first use. Raises IdempotencyUnsupportedError if the declaration is missing — sellers that don't declare it provide no retry-safety guarantee per AdCP #2315. Defaults to False for backward compatibility.
signing
Optional RFC 9421 request-signing config. When provided, the client automatically attaches Signature / Signature-Input / Content-Digest headers to operations the seller's request_signing capability lists in required_for, warn_for, or supported_for. The seller's covers_content_digest policy determines whether the body is bound to the signature. Generate a key with adcp-keygen and publish the public JWK at your jwks_uri. Supported on both A2A and MCP (mcp_transport="streamable_http"); SSE-transport MCP logs a warning and falls through unsigned.

Instance variables

prop capabilities : GetAdcpCapabilitiesResponse | None
Expand source code
@property
def capabilities(self) -> GetAdcpCapabilitiesResponse | None:
    """Return cached capabilities, or None if not yet fetched."""
    return self._capabilities

Return cached capabilities, or None if not yet fetched.

prop feature_resolver : FeatureResolver | None
Expand source code
@property
def feature_resolver(self) -> FeatureResolver | None:
    """Return the FeatureResolver for cached capabilities, or None."""
    return self._feature_resolver

Return the FeatureResolver for cached capabilities, or None.

Methods

async def acquire_rights(self, request: AcquireRightsRequest) ‑> TaskResult[Union[AcquireRightsResponse1, AcquireRightsResponse2, AcquireRightsResponse3, AcquireRightsResponse4]]
Expand source code
async def acquire_rights(
    self,
    request: AcquireRightsRequest,
) -> TaskResult[AcquireRightsResponse]:
    """Acquire rights for brand content usage.

    Binding contractual request to license rights for a campaign.
    Returns credentials for generating rights-cleared content.

    Args:
        request: Request with rights_id, pricing_option_id, buyer,
            campaign, and revocation_webhook.

    Returns:
        TaskResult containing AcquireRightsResponse (acquired,
        pending_approval, rejected, or error).
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="acquire_rights",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.acquire_rights(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="acquire_rights",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, AcquireRightsResponse)

Acquire rights for brand content usage.

Binding contractual request to license rights for a campaign. Returns credentials for generating rights-cleared content.

Args

request
Request with rights_id, pricing_option_id, buyer, campaign, and revocation_webhook.

Returns

TaskResult containing AcquireRightsResponse (acquired, pending_approval, rejected, or error).

async def activate_signal(self, request: ActivateSignalRequest) ‑> TaskResult[Union[ActivateSignalResponse1, ActivateSignalResponse2]]
Expand source code
async def activate_signal(
    self,
    request: ActivateSignalRequest,
) -> TaskResult[ActivateSignalResponse]:
    """
    Activate Signal.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ActivateSignalResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="activate_signal",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.activate_signal(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="activate_signal",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ActivateSignalResponse)

Activate Signal.

Args

request
Request parameters

Returns

TaskResult containing ActivateSignalResponse

async def build_creative(self, request: BuildCreativeRequest) ‑> TaskResult[Union[BuildCreativeResponse1, BuildCreativeResponse2, BuildCreativeResponse3]]
Expand source code
async def build_creative(
    self,
    request: BuildCreativeRequest,
) -> TaskResult[BuildCreativeResponse]:
    """
    Generate production-ready creative assets.

    Requests the creative agent to build final deliverable assets in the target
    format (e.g., VAST, DAAST, HTML5). This is typically called after previewing
    and approving a creative manifest.

    Args:
        request: Creative build parameters including:
            - manifest: Creative manifest with brand info and content
            - target_format_id: Desired output format identifier
            - inputs: Optional user-provided inputs for template variables
            - deployment: Platform or agent deployment configuration

    Returns:
        TaskResult containing BuildCreativeResponse with:
            - assets: Production-ready creative files (URLs or inline content)
            - format_id: The generated format identifier
            - manifest: The creative manifest used for generation
            - metadata: Additional platform-specific details

    Example:
        >>> from adcp import ADCPClient, BuildCreativeRequest
        >>> client = ADCPClient(agent_config)
        >>> request = BuildCreativeRequest(
        ...     manifest=creative_manifest,
        ...     target_format_id="vast_2.0",
        ...     inputs={"duration": 30}
        ... )
        >>> result = await client.build_creative(request)
        >>> if result.success:
        ...     vast_url = result.data.assets[0].url
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="build_creative",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.build_creative(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="build_creative",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, BuildCreativeResponse)

Generate production-ready creative assets.

Requests the creative agent to build final deliverable assets in the target format (e.g., VAST, DAAST, HTML5). This is typically called after previewing and approving a creative manifest.

Args

request
Creative build parameters including: - manifest: Creative manifest with brand info and content - target_format_id: Desired output format identifier - inputs: Optional user-provided inputs for template variables - deployment: Platform or agent deployment configuration

Returns

TaskResult containing BuildCreativeResponse with: - assets: Production-ready creative files (URLs or inline content) - format_id: The generated format identifier - manifest: The creative manifest used for generation - metadata: Additional platform-specific details

Example

>>> from adcp import ADCPClient, BuildCreativeRequest
>>> client = ADCPClient(agent_config)
>>> request = BuildCreativeRequest(
...     manifest=creative_manifest,
...     target_format_id="vast_2.0",
...     inputs={"duration": 30}
... )
>>> result = await client.build_creative(request)
>>> if result.success:
...     vast_url = result.data.assets[0].url
async def calibrate_content(self, request: CalibrateContentRequest) ‑> TaskResult[Union[CalibrateContentResponse1, CalibrateContentResponse2]]
Expand source code
async def calibrate_content(
    self,
    request: CalibrateContentRequest,
) -> TaskResult[CalibrateContentResponse]:
    """
    Calibrate content against standards.

    Evaluates content (artifact or URL) against configured standards to
    determine suitability for ad placement.

    Args:
        request: Request parameters including content to evaluate

    Returns:
        TaskResult containing CalibrateContentResponse with verdict
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="calibrate_content",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.calibrate_content(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="calibrate_content",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CalibrateContentResponse)

Calibrate content against standards.

Evaluates content (artifact or URL) against configured standards to determine suitability for ad placement.

Args

request
Request parameters including content to evaluate

Returns

TaskResult containing CalibrateContentResponse with verdict

async def check_governance(self, request: CheckGovernanceRequest) ‑> TaskResult[CheckGovernanceResponse]
Expand source code
async def check_governance(
    self,
    request: CheckGovernanceRequest,
) -> TaskResult[CheckGovernanceResponse]:
    """Check a proposed or committed action against campaign governance."""
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="check_governance",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.check_governance(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="check_governance",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CheckGovernanceResponse)

Check a proposed or committed action against campaign governance.

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close the adapter and clean up resources."""
    if hasattr(self.adapter, "close"):
        logger.debug(f"Closing adapter for agent {self.agent_config.id}")
        await self.adapter.close()

Close the adapter and clean up resources.

async def comply_test_controller(self, request: ComplyTestControllerRequest) ‑> TaskResult[Union[ComplyTestControllerResponse1, ComplyTestControllerResponse2, ComplyTestControllerResponse3, ComplyTestControllerResponse4]]
Expand source code
async def comply_test_controller(
    self,
    request: ComplyTestControllerRequest,
) -> TaskResult[ComplyTestControllerResponse]:
    """Compliance test controller for sandbox testing.

    Enables sellers to simulate state transitions and delivery data
    in a sandbox environment for compliance testing.

    Args:
        request: Request specifying scenario and parameters.

    Returns:
        TaskResult containing ComplyTestControllerResponse.
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="comply_test_controller",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.comply_test_controller(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="comply_test_controller",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ComplyTestControllerResponse)

Compliance test controller for sandbox testing.

Enables sellers to simulate state transitions and delivery data in a sandbox environment for compliance testing.

Args

request
Request specifying scenario and parameters.

Returns

TaskResult containing ComplyTestControllerResponse.

async def context_match(self, request: ContextMatchRequest) ‑> TaskResult[ContextMatchResponse]
Expand source code
async def context_match(
    self,
    request: ContextMatchRequest,
) -> TaskResult[ContextMatchResponse]:
    """Match ad context to buyer packages.

    Evaluates contextual signals for a publisher placement against the
    buyer's active packages and returns matching offers.

    Args:
        request: Context match request with placement, property, and
            optional artifact refs, context signals, and geo data.

    Returns:
        TaskResult containing ContextMatchResponse with offers.
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True, by_alias=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="context_match",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.context_match(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="context_match",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ContextMatchResponse)

Match ad context to buyer packages.

Evaluates contextual signals for a publisher placement against the buyer's active packages and returns matching offers.

Args

request
Context match request with placement, property, and optional artifact refs, context signals, and geo data.

Returns

TaskResult containing ContextMatchResponse with offers.

async def create_collection_list(self, request: CreateCollectionListRequest) ‑> TaskResult[CreateCollectionListResponse]
Expand source code
async def create_collection_list(
    self,
    request: CreateCollectionListRequest,
) -> TaskResult[CreateCollectionListResponse]:
    """Create a collection list for governance filtering.

    Collection lists define dynamic sets of collections (properties, segments, etc.)
    that can be referenced by authorization rules and audience scoping.

    Args:
        request: Request parameters for creating the collection list

    Returns:
        TaskResult containing CreateCollectionListResponse with list_id
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_collection_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.create_collection_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_collection_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CreateCollectionListResponse)

Create a collection list for governance filtering.

Collection lists define dynamic sets of collections (properties, segments, etc.) that can be referenced by authorization rules and audience scoping.

Args

request
Request parameters for creating the collection list

Returns

TaskResult containing CreateCollectionListResponse with list_id

async def create_content_standards(self, request: CreateContentStandardsRequest) ‑> TaskResult[Union[CreateContentStandardsResponse1, CreateContentStandardsResponse2]]
Expand source code
async def create_content_standards(
    self,
    request: CreateContentStandardsRequest,
) -> TaskResult[CreateContentStandardsResponse]:
    """
    Create a new content standards configuration.

    Defines acceptable content contexts for ad placement using natural
    language policy and optional calibration exemplars.

    Args:
        request: Request parameters including policy and scope

    Returns:
        TaskResult containing CreateContentStandardsResponse with standards_id
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_content_standards",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.create_content_standards(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_content_standards",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CreateContentStandardsResponse)

Create a new content standards configuration.

Defines acceptable content contexts for ad placement using natural language policy and optional calibration exemplars.

Args

request
Request parameters including policy and scope

Returns

TaskResult containing CreateContentStandardsResponse with standards_id

async def create_media_buy(self, request: CreateMediaBuyRequest) ‑> TaskResult[Union[CreateMediaBuyResponse1, CreateMediaBuyResponse2, CreateMediaBuyResponse3]]
Expand source code
async def create_media_buy(
    self,
    request: CreateMediaBuyRequest,
) -> TaskResult[CreateMediaBuyResponse]:
    """
    Create a new media buy reservation.

    Requests the agent to reserve inventory for a campaign. The agent returns a
    media_buy_id that tracks this reservation and can be used for updates.

    Args:
        request: Media buy creation parameters including:
            - brand: Brand reference; resolved from brand.json or the registry at execution
            - packages: List of package requests specifying desired inventory
            - publisher_properties: Target properties for ad placement
            - budget: Optional budget constraints
            - start_date/end_date: Campaign flight dates

    Returns:
        TaskResult containing CreateMediaBuyResponse with:
            - media_buy_id: Unique identifier for this reservation
            - status: Current state of the media buy
            - packages: Confirmed package details
            - Additional platform-specific metadata

    Example:
        >>> from adcp import ADCPClient, CreateMediaBuyRequest, BrandReference
        >>> client = ADCPClient(agent_config)
        >>> request = CreateMediaBuyRequest(
        ...     brand=BrandReference(domain="acme.com"),
        ...     packages=[package_request],
        ...     publisher_properties=properties,
        ... )
        >>> result = await client.create_media_buy(request)
        >>> if result.success:
        ...     media_buy_id = result.data.media_buy_id
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_media_buy",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.create_media_buy(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_media_buy",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CreateMediaBuyResponse)

Create a new media buy reservation.

Requests the agent to reserve inventory for a campaign. The agent returns a media_buy_id that tracks this reservation and can be used for updates.

Args

request
Media buy creation parameters including: - brand: Brand reference; resolved from brand.json or the registry at execution - packages: List of package requests specifying desired inventory - publisher_properties: Target properties for ad placement - budget: Optional budget constraints - start_date/end_date: Campaign flight dates

Returns

TaskResult containing CreateMediaBuyResponse with: - media_buy_id: Unique identifier for this reservation - status: Current state of the media buy - packages: Confirmed package details - Additional platform-specific metadata

Example

>>> from adcp import ADCPClient, CreateMediaBuyRequest, BrandReference
>>> client = ADCPClient(agent_config)
>>> request = CreateMediaBuyRequest(
...     brand=BrandReference(domain="acme.com"),
...     packages=[package_request],
...     publisher_properties=properties,
... )
>>> result = await client.create_media_buy(request)
>>> if result.success:
...     media_buy_id = result.data.media_buy_id
async def create_property_list(self, request: CreatePropertyListRequest) ‑> TaskResult[CreatePropertyListResponse]
Expand source code
async def create_property_list(
    self,
    request: CreatePropertyListRequest,
) -> TaskResult[CreatePropertyListResponse]:
    """
    Create a property list for governance filtering.

    Property lists define dynamic sets of properties based on filters,
    brand manifests, and feature requirements.

    Args:
        request: Request parameters for creating the property list

    Returns:
        TaskResult containing CreatePropertyListResponse with list_id
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_property_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.create_property_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_property_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CreatePropertyListResponse)

Create a property list for governance filtering.

Property lists define dynamic sets of properties based on filters, brand manifests, and feature requirements.

Args

request
Request parameters for creating the property list

Returns

TaskResult containing CreatePropertyListResponse with list_id

async def delete_collection_list(self, request: DeleteCollectionListRequest) ‑> TaskResult[DeleteCollectionListResponse]
Expand source code
async def delete_collection_list(
    self,
    request: DeleteCollectionListRequest,
) -> TaskResult[DeleteCollectionListResponse]:
    """Delete a collection list.

    Args:
        request: Request parameters with list_id

    Returns:
        TaskResult containing DeleteCollectionListResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="delete_collection_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.delete_collection_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="delete_collection_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, DeleteCollectionListResponse)

Delete a collection list.

Args

request
Request parameters with list_id

Returns

TaskResult containing DeleteCollectionListResponse

async def delete_property_list(self, request: DeletePropertyListRequest) ‑> TaskResult[DeletePropertyListResponse]
Expand source code
async def delete_property_list(
    self,
    request: DeletePropertyListRequest,
) -> TaskResult[DeletePropertyListResponse]:
    """
    Delete a property list.

    Removes a property list. Any active subscriptions to this list
    will be terminated.

    Args:
        request: Request parameters with list_id

    Returns:
        TaskResult containing DeletePropertyListResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="delete_property_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.delete_property_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="delete_property_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, DeletePropertyListResponse)

Delete a property list.

Removes a property list. Any active subscriptions to this list will be terminated.

Args

request
Request parameters with list_id

Returns

TaskResult containing DeletePropertyListResponse

async def fetch_capabilities(self) ‑> GetAdcpCapabilitiesResponse
Expand source code
async def fetch_capabilities(self) -> GetAdcpCapabilitiesResponse:
    """Fetch capabilities, using cache if still valid.

    Returns:
        The seller's capabilities response.
    """
    if self._capabilities is not None and self._capabilities_fetched_at is not None:
        elapsed = time.monotonic() - self._capabilities_fetched_at
        if elapsed < self.capabilities_ttl:
            return self._capabilities

    return await self.refresh_capabilities()

Fetch capabilities, using cache if still valid.

Returns

The seller's capabilities response.

async def get_account_financials(self, request: GetAccountFinancialsRequest) ‑> TaskResult[Union[GetAccountFinancialsResponse1, GetAccountFinancialsResponse2]]
Expand source code
async def get_account_financials(
    self,
    request: GetAccountFinancialsRequest,
) -> TaskResult[GetAccountFinancialsResponse]:
    """
    Get Account Financials.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetAccountFinancialsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_account_financials",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_account_financials(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_account_financials",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetAccountFinancialsResponse)

Get Account Financials.

Args

request
Request parameters

Returns

TaskResult containing GetAccountFinancialsResponse

async def get_adcp_capabilities(self, request: GetAdcpCapabilitiesRequest) ‑> TaskResult[GetAdcpCapabilitiesResponse]
Expand source code
async def get_adcp_capabilities(
    self,
    request: GetAdcpCapabilitiesRequest,
) -> TaskResult[GetAdcpCapabilitiesResponse]:
    """
    Get AdCP capabilities from the agent.

    Queries the agent's supported AdCP features, protocol versions, and
    domain-specific capabilities (media_buy, signals, sponsored_intelligence).

    Args:
        request: Request parameters including optional protocol filters

    Returns:
        TaskResult containing GetAdcpCapabilitiesResponse with:
            - adcp: Core protocol version information
            - supported_protocols: List of supported domain protocols
            - media_buy: Media buy capabilities (if supported)
            - sponsored_intelligence: SI capabilities (if supported)
            - signals: Signals capabilities (if supported)
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_adcp_capabilities",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_adcp_capabilities(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_adcp_capabilities",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetAdcpCapabilitiesResponse)

Get AdCP capabilities from the agent.

Queries the agent's supported AdCP features, protocol versions, and domain-specific capabilities (media_buy, signals, sponsored_intelligence).

Args

request
Request parameters including optional protocol filters

Returns

TaskResult containing GetAdcpCapabilitiesResponse with: - adcp: Core protocol version information - supported_protocols: List of supported domain protocols - media_buy: Media buy capabilities (if supported) - sponsored_intelligence: SI capabilities (if supported) - signals: Signals capabilities (if supported)

async def get_brand_identity(self, request: GetBrandIdentityRequest) ‑> TaskResult[Union[GetBrandIdentityResponse1, GetBrandIdentityResponse2]]
Expand source code
async def get_brand_identity(
    self,
    request: GetBrandIdentityRequest,
) -> TaskResult[GetBrandIdentityResponse]:
    """Get brand identity information.

    Retrieves brand identity data including logos, colors, fonts,
    voice synthesis config, and rights availability.

    Args:
        request: Request with brand_id and optional fields filter.

    Returns:
        TaskResult containing GetBrandIdentityResponse.
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_brand_identity",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_brand_identity(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_brand_identity",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetBrandIdentityResponse)

Get brand identity information.

Retrieves brand identity data including logos, colors, fonts, voice synthesis config, and rights availability.

Args

request
Request with brand_id and optional fields filter.

Returns

TaskResult containing GetBrandIdentityResponse.

async def get_collection_list(self, request: GetCollectionListRequest) ‑> TaskResult[GetCollectionListResponse]
Expand source code
async def get_collection_list(
    self,
    request: GetCollectionListRequest,
) -> TaskResult[GetCollectionListResponse]:
    """Get a collection list with optional resolution.

    When resolve=true, returns the resolved members of the collection list.

    Args:
        request: Request parameters including list_id and resolve flag

    Returns:
        TaskResult containing GetCollectionListResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_collection_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_collection_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_collection_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetCollectionListResponse)

Get a collection list with optional resolution.

When resolve=true, returns the resolved members of the collection list.

Args

request
Request parameters including list_id and resolve flag

Returns

TaskResult containing GetCollectionListResponse

async def get_content_standards(self, request: GetContentStandardsRequest) ‑> TaskResult[Union[GetContentStandardsResponse1, GetContentStandardsResponse2]]
Expand source code
async def get_content_standards(
    self,
    request: GetContentStandardsRequest,
) -> TaskResult[GetContentStandardsResponse]:
    """
    Get a content standards configuration by ID.

    Args:
        request: Request parameters including standards_id

    Returns:
        TaskResult containing GetContentStandardsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_content_standards",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_content_standards(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_content_standards",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetContentStandardsResponse)

Get a content standards configuration by ID.

Args

request
Request parameters including standards_id

Returns

TaskResult containing GetContentStandardsResponse

async def get_creative_delivery(self, request: GetCreativeDeliveryRequest) ‑> TaskResult[GetCreativeDeliveryResponse]
Expand source code
async def get_creative_delivery(
    self,
    request: GetCreativeDeliveryRequest,
) -> TaskResult[GetCreativeDeliveryResponse]:
    """
    Get Creative Delivery.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetCreativeDeliveryResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_creative_delivery",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_creative_delivery(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_creative_delivery",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetCreativeDeliveryResponse)

Get Creative Delivery.

Args

request
Request parameters

Returns

TaskResult containing GetCreativeDeliveryResponse

async def get_creative_features(self, request: GetCreativeFeaturesRequest) ‑> TaskResult[Union[GetCreativeFeaturesResponse1, GetCreativeFeaturesResponse2]]
Expand source code
async def get_creative_features(
    self,
    request: GetCreativeFeaturesRequest,
) -> TaskResult[GetCreativeFeaturesResponse]:
    """Evaluate governance features for a creative manifest."""
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_creative_features",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_creative_features(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_creative_features",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetCreativeFeaturesResponse)

Evaluate governance features for a creative manifest.

async def get_info(self) ‑> dict[str, typing.Any]
Expand source code
async def get_info(self) -> dict[str, Any]:
    """
    Get agent information including AdCP extension metadata.

    Returns agent card information including:
    - Agent name, description, version
    - Protocol type (mcp or a2a)
    - AdCP version (from extensions.adcp.adcp_version)
    - Supported protocols (from extensions.adcp.protocols_supported)
    - Available tools/skills

    Returns:
        Dictionary with agent metadata
    """
    return await self.adapter.get_agent_info()

Get agent information including AdCP extension metadata.

Returns agent card information including: - Agent name, description, version - Protocol type (mcp or a2a) - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills

Returns

Dictionary with agent metadata

async def get_media_buy_artifacts(self, request: GetMediaBuyArtifactsRequest) ‑> TaskResult[Union[GetMediaBuyArtifactsResponse1, GetMediaBuyArtifactsResponse2]]
Expand source code
async def get_media_buy_artifacts(
    self,
    request: GetMediaBuyArtifactsRequest,
) -> TaskResult[GetMediaBuyArtifactsResponse]:
    """
    Get artifacts associated with a media buy.

    Retrieves content artifacts where ads were delivered for a media buy.

    Args:
        request: Request parameters including media_buy_id

    Returns:
        TaskResult containing GetMediaBuyArtifactsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buy_artifacts",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_media_buy_artifacts(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buy_artifacts",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetMediaBuyArtifactsResponse)

Get artifacts associated with a media buy.

Retrieves content artifacts where ads were delivered for a media buy.

Args

request
Request parameters including media_buy_id

Returns

TaskResult containing GetMediaBuyArtifactsResponse

async def get_media_buy_delivery(self, request: GetMediaBuyDeliveryRequest) ‑> TaskResult[GetMediaBuyDeliveryResponse]
Expand source code
async def get_media_buy_delivery(
    self,
    request: GetMediaBuyDeliveryRequest,
) -> TaskResult[GetMediaBuyDeliveryResponse]:
    """
    Get Media Buy Delivery.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetMediaBuyDeliveryResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buy_delivery",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_media_buy_delivery(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buy_delivery",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse)

Get Media Buy Delivery.

Args

request
Request parameters

Returns

TaskResult containing GetMediaBuyDeliveryResponse

async def get_media_buys(self, request: GetMediaBuysRequest) ‑> TaskResult[GetMediaBuysResponse]
Expand source code
async def get_media_buys(
    self,
    request: GetMediaBuysRequest,
) -> TaskResult[GetMediaBuysResponse]:
    """
    Get Media Buys.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetMediaBuysResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buys",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_media_buys(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buys",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetMediaBuysResponse)

Get Media Buys.

Args

request
Request parameters

Returns

TaskResult containing GetMediaBuysResponse

async def get_plan_audit_logs(self, request: GetPlanAuditLogsRequest) ‑> TaskResult[GetPlanAuditLogsResponse]
Expand source code
async def get_plan_audit_logs(
    self,
    request: GetPlanAuditLogsRequest,
) -> TaskResult[GetPlanAuditLogsResponse]:
    """Retrieve governance state and audit logs for one or more plans."""
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_plan_audit_logs",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_plan_audit_logs(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_plan_audit_logs",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetPlanAuditLogsResponse)

Retrieve governance state and audit logs for one or more plans.

async def get_products(self,
request: GetProductsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url',
creative_agent_client: ADCPClient | None = None) ‑> TaskResult[GetProductsResponse]
Expand source code
async def get_products(
    self,
    request: GetProductsRequest,
    fetch_previews: bool = False,
    preview_output_format: str = "url",
    creative_agent_client: ADCPClient | None = None,
) -> TaskResult[GetProductsResponse]:
    """
    Get advertising products.

    Args:
        request: Request parameters
        fetch_previews: If True, generate preview URLs for each product's formats
            (uses batch API for 5-10x performance improvement)
        preview_output_format: "url" for iframe URLs (default), "html" for direct
            embedding (2-3x faster, no iframe overhead)
        creative_agent_client: Client for creative agent (required if
            fetch_previews=True)

    Returns:
        TaskResult containing GetProductsResponse with optional preview URLs in metadata

    Raises:
        ValueError: If fetch_previews=True but creative_agent_client is not provided
    """
    if fetch_previews and not creative_agent_client:
        raise ValueError("creative_agent_client is required when fetch_previews=True")

    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_products",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_products(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_products",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    result: TaskResult[GetProductsResponse] = self.adapter._parse_response(
        raw_result, GetProductsResponse
    )

    if fetch_previews and result.success and result.data and creative_agent_client:
        from adcp.utils.preview_cache import add_preview_urls_to_products

        products_with_previews = await add_preview_urls_to_products(
            result.data.products,
            creative_agent_client,
            use_batch=True,
            output_format=preview_output_format,
        )
        result.metadata = result.metadata or {}
        result.metadata["products_with_previews"] = products_with_previews

    return result

Get advertising products.

Args

request
Request parameters
fetch_previews
If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement)
preview_output_format
"url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)
creative_agent_client
Client for creative agent (required if fetch_previews=True)

Returns

TaskResult containing GetProductsResponse with optional preview URLs in metadata

Raises

ValueError
If fetch_previews=True but creative_agent_client is not provided
async def get_property_list(self, request: GetPropertyListRequest) ‑> TaskResult[GetPropertyListResponse]
Expand source code
async def get_property_list(
    self,
    request: GetPropertyListRequest,
) -> TaskResult[GetPropertyListResponse]:
    """
    Get a property list with optional resolution.

    When resolve=true, returns the list of resolved property identifiers.
    Use this to get the actual properties that match the list's filters.

    Args:
        request: Request parameters including list_id and resolve flag

    Returns:
        TaskResult containing GetPropertyListResponse with identifiers
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_property_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_property_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_property_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetPropertyListResponse)

Get a property list with optional resolution.

When resolve=true, returns the list of resolved property identifiers. Use this to get the actual properties that match the list's filters.

Args

request
Request parameters including list_id and resolve flag

Returns

TaskResult containing GetPropertyListResponse with identifiers

async def get_rights(self, request: GetRightsRequest) ‑> TaskResult[Union[GetRightsResponse1, GetRightsResponse2]]
Expand source code
async def get_rights(
    self,
    request: GetRightsRequest,
) -> TaskResult[GetRightsResponse]:
    """Get available rights for licensing.

    Searches for rights offerings using natural language query and
    filters by type, uses, countries, and buyer compatibility.

    Args:
        request: Request with query, uses, and optional filters.

    Returns:
        TaskResult containing GetRightsResponse with matched rights.
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_rights",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_rights(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_rights",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetRightsResponse)

Get available rights for licensing.

Searches for rights offerings using natural language query and filters by type, uses, countries, and buyer compatibility.

Args

request
Request with query, uses, and optional filters.

Returns

TaskResult containing GetRightsResponse with matched rights.

async def get_signals(self, request: GetSignalsRequest) ‑> TaskResult[GetSignalsResponse]
Expand source code
async def get_signals(
    self,
    request: GetSignalsRequest,
) -> TaskResult[GetSignalsResponse]:
    """
    Get Signals.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetSignalsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_signals",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_signals(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_signals",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetSignalsResponse)

Get Signals.

Args

request
Request parameters

Returns

TaskResult containing GetSignalsResponse

def get_webhook_url(self, task_type: str, operation_id: str) ‑> str
Expand source code
def get_webhook_url(self, task_type: str, operation_id: str) -> str:
    """Generate webhook URL for a task."""
    if not self.webhook_url_template:
        raise ValueError("webhook_url_template not configured")

    return self.webhook_url_template.format(
        agent_id=self.agent_config.id,
        task_type=task_type,
        operation_id=operation_id,
    )

Generate webhook URL for a task.

async def handle_webhook(self,
payload: dict[str, Any] | Task | TaskStatusUpdateEvent,
task_type: str,
operation_id: str,
signature: str | None = None,
timestamp: str | None = None,
raw_body: bytes | str | None = None) ‑> TaskResult[AdcpAsyncResponseData]
Expand source code
async def handle_webhook(
    self,
    payload: dict[str, Any] | Task | TaskStatusUpdateEvent,
    task_type: str,
    operation_id: str,
    signature: str | None = None,
    timestamp: str | None = None,
    raw_body: bytes | str | None = None,
) -> TaskResult[AdcpAsyncResponseData]:
    """
    Handle incoming webhook and return typed result.

    This method provides a unified interface for handling webhooks from both
    MCP and A2A protocols:

    - MCP Webhooks: HTTP POST with dict payload, optional HMAC signature
    - A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status

    The method automatically detects the protocol type and routes to the
    appropriate handler. Both protocols return a consistent TaskResult
    structure with typed AdCP response data.

    Args:
        payload: Webhook payload - one of:
            - dict[str, Any]: MCP webhook payload from HTTP POST
            - Task: A2A webhook for terminated statuses (completed, failed)
            - TaskStatusUpdateEvent: A2A webhook for intermediate statuses
              (working, input-required, submitted)
        task_type: Task type from application routing (e.g., "get_products").
            Applications should extract this from URL routing pattern:
            /webhook/{task_type}/{agent_id}/{operation_id}
        operation_id: Operation identifier from application routing.
            Used to correlate webhook notifications with original task submission.
        signature: Optional HMAC-SHA256 signature for MCP webhook verification
            (X-AdCP-Signature header). Ignored for A2A webhooks.
        timestamp: Optional Unix timestamp (seconds) for MCP webhook signature
            verification (X-AdCP-Timestamp header). Required when signature is provided.
        raw_body: Optional raw HTTP request body bytes for signature verification.
            When provided, used directly instead of re-serializing the payload,
            avoiding cross-language JSON serialization mismatches. Strongly
            recommended for production use.

    Returns:
        TaskResult with parsed task-specific response data. The structure
        is identical regardless of protocol.

    Raises:
        ADCPWebhookSignatureError: If MCP signature verification fails
        ValidationError: If MCP payload doesn't match WebhookPayload schema

    Note:
        task_type and operation_id were deprecated from the webhook payload
        per AdCP specification. Applications must extract these from URL
        routing and pass them explicitly.

    Examples:
        MCP webhook (HTTP endpoint):
        >>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}")
        >>> async def webhook_handler(task_type: str, operation_id: str, request: Request):
        >>>     raw_body = await request.body()
        >>>     payload = json.loads(raw_body)
        >>>     signature = request.headers.get("X-AdCP-Signature")
        >>>     timestamp = request.headers.get("X-AdCP-Timestamp")
        >>>     result = await client.handle_webhook(
        >>>         payload, task_type, operation_id, signature, timestamp,
        >>>         raw_body=raw_body,
        >>>     )
        >>>     if result.success:
        >>>         print(f"Task completed: {result.data}")

        A2A webhook with Task (terminated status):
        >>> async def on_task_completed(task: Task):
        >>>     # Extract task_type and operation_id from your app's task tracking
        >>>     task_type = your_task_registry.get_type(task.id)
        >>>     operation_id = your_task_registry.get_operation_id(task.id)
        >>>     result = await client.handle_webhook(
        >>>         task, task_type, operation_id
        >>>     )
        >>>     if result.success:
        >>>         print(f"Task completed: {result.data}")

        A2A webhook with TaskStatusUpdateEvent (intermediate status):
        >>> async def on_task_update(event: TaskStatusUpdateEvent):
        >>>     # Extract task_type and operation_id from your app's task tracking
        >>>     task_type = your_task_registry.get_type(event.task_id)
        >>>     operation_id = your_task_registry.get_operation_id(event.task_id)
        >>>     result = await client.handle_webhook(
        >>>         event, task_type, operation_id
        >>>     )
        >>>     if result.status == GeneratedTaskStatus.working:
        >>>         print(f"Task still working: {result.metadata.get('message')}")
    """
    # Detect protocol type and route to appropriate handler
    if isinstance(payload, (Task, TaskStatusUpdateEvent)):
        # A2A webhook (Task or TaskStatusUpdateEvent)
        return await self._handle_a2a_webhook(payload, task_type, operation_id)
    else:
        # MCP webhook (dict payload)
        return await self._handle_mcp_webhook(
            payload, task_type, operation_id, signature, timestamp, raw_body
        )

Handle incoming webhook and return typed result.

This method provides a unified interface for handling webhooks from both MCP and A2A protocols:

  • MCP Webhooks: HTTP POST with dict payload, optional HMAC signature
  • A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status

The method automatically detects the protocol type and routes to the appropriate handler. Both protocols return a consistent TaskResult structure with typed AdCP response data.

Args

payload
Webhook payload - one of: - dict[str, Any]: MCP webhook payload from HTTP POST - Task: A2A webhook for terminated statuses (completed, failed) - TaskStatusUpdateEvent: A2A webhook for intermediate statuses (working, input-required, submitted)
task_type
Task type from application routing (e.g., "get_products"). Applications should extract this from URL routing pattern: /webhook/{task_type}/{agent_id}/{operation_id}
operation_id
Operation identifier from application routing. Used to correlate webhook notifications with original task submission.
signature
Optional HMAC-SHA256 signature for MCP webhook verification (X-AdCP-Signature header). Ignored for A2A webhooks.
timestamp
Optional Unix timestamp (seconds) for MCP webhook signature verification (X-AdCP-Timestamp header). Required when signature is provided.
raw_body
Optional raw HTTP request body bytes for signature verification. When provided, used directly instead of re-serializing the payload, avoiding cross-language JSON serialization mismatches. Strongly recommended for production use.

Returns

TaskResult with parsed task-specific response data. The structure is identical regardless of protocol.

Raises

ADCPWebhookSignatureError
If MCP signature verification fails
ValidationError
If MCP payload doesn't match WebhookPayload schema

Note

task_type and operation_id were deprecated from the webhook payload per AdCP specification. Applications must extract these from URL routing and pass them explicitly.

Examples

MCP webhook (HTTP endpoint):

>>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}")
>>> async def webhook_handler(task_type: str, operation_id: str, request: Request):
>>>     raw_body = await request.body()
>>>     payload = json.loads(raw_body)
>>>     signature = request.headers.get("X-AdCP-Signature")
>>>     timestamp = request.headers.get("X-AdCP-Timestamp")
>>>     result = await client.handle_webhook(
>>>         payload, task_type, operation_id, signature, timestamp,
>>>         raw_body=raw_body,
>>>     )
>>>     if result.success:
>>>         print(f"Task completed: {result.data}")

A2A webhook with Task (terminated status):

>>> async def on_task_completed(task: Task):
>>>     # Extract task_type and operation_id from your app's task tracking
>>>     task_type = your_task_registry.get_type(task.id)
>>>     operation_id = your_task_registry.get_operation_id(task.id)
>>>     result = await client.handle_webhook(
>>>         task, task_type, operation_id
>>>     )
>>>     if result.success:
>>>         print(f"Task completed: {result.data}")

A2A webhook with TaskStatusUpdateEvent (intermediate status):

>>> async def on_task_update(event: TaskStatusUpdateEvent):
>>>     # Extract task_type and operation_id from your app's task tracking
>>>     task_type = your_task_registry.get_type(event.task_id)
>>>     operation_id = your_task_registry.get_operation_id(event.task_id)
>>>     result = await client.handle_webhook(
>>>         event, task_type, operation_id
>>>     )
>>>     if result.status == GeneratedTaskStatus.working:
>>>         print(f"Task still working: {result.metadata.get('message')}")
async def identity_match(self, request: IdentityMatchRequest) ‑> TaskResult[IdentityMatchResponse]
Expand source code
async def identity_match(
    self,
    request: IdentityMatchRequest,
) -> TaskResult[IdentityMatchResponse]:
    """Match user identity for package eligibility.

    Evaluates a user identity token against all active packages for
    frequency capping and personalization.

    Args:
        request: Identity match request with user_token, uid_type,
            and package_ids.

    Returns:
        TaskResult containing IdentityMatchResponse with eligible_package_ids.
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True, by_alias=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="identity_match",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.identity_match(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="identity_match",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, IdentityMatchResponse)

Match user identity for package eligibility.

Evaluates a user identity token against all active packages for frequency capping and personalization.

Args

request
Identity match request with user_token, uid_type, and package_ids.

Returns

TaskResult containing IdentityMatchResponse with eligible_package_ids.

async def list_accounts(self, request: ListAccountsRequest) ‑> TaskResult[ListAccountsResponse]
Expand source code
async def list_accounts(
    self,
    request: ListAccountsRequest,
) -> TaskResult[ListAccountsResponse]:
    """
    List Accounts.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ListAccountsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_accounts",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_accounts(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_accounts",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ListAccountsResponse)

List Accounts.

Args

request
Request parameters

Returns

TaskResult containing ListAccountsResponse

async def list_collection_lists(self, request: ListCollectionListsRequest) ‑> TaskResult[ListCollectionListsResponse]
Expand source code
async def list_collection_lists(
    self,
    request: ListCollectionListsRequest,
) -> TaskResult[ListCollectionListsResponse]:
    """List collection lists owned by a principal.

    Args:
        request: Request parameters with optional filtering

    Returns:
        TaskResult containing ListCollectionListsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_collection_lists",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_collection_lists(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_collection_lists",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ListCollectionListsResponse)

List collection lists owned by a principal.

Args

request
Request parameters with optional filtering

Returns

TaskResult containing ListCollectionListsResponse

async def list_content_standards(self, request: ListContentStandardsRequest) ‑> TaskResult[Union[ListContentStandardsResponse1, ListContentStandardsResponse2]]
Expand source code
async def list_content_standards(
    self,
    request: ListContentStandardsRequest,
) -> TaskResult[ListContentStandardsResponse]:
    """
    List content standards configurations.

    Args:
        request: Request parameters including optional filters

    Returns:
        TaskResult containing ListContentStandardsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_content_standards",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_content_standards(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_content_standards",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ListContentStandardsResponse)

List content standards configurations.

Args

request
Request parameters including optional filters

Returns

TaskResult containing ListContentStandardsResponse

async def list_creative_formats(self,
request: ListCreativeFormatsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url') ‑> TaskResult[ListCreativeFormatsResponse]
Expand source code
async def list_creative_formats(
    self,
    request: ListCreativeFormatsRequest,
    fetch_previews: bool = False,
    preview_output_format: str = "url",
) -> TaskResult[ListCreativeFormatsResponse]:
    """
    List supported creative formats.

    Args:
        request: Request parameters
        fetch_previews: If True, generate preview URLs for each format using
            sample manifests (uses batch API for 5-10x performance improvement)
        preview_output_format: "url" for iframe URLs (default), "html" for direct
            embedding (2-3x faster, no iframe overhead)

    Returns:
        TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_creative_formats",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_creative_formats(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_creative_formats",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response(
        raw_result, ListCreativeFormatsResponse
    )

    if fetch_previews and result.success and result.data:
        from adcp.utils.preview_cache import add_preview_urls_to_formats

        formats_with_previews = await add_preview_urls_to_formats(
            result.data.formats,
            self,
            use_batch=True,
            output_format=preview_output_format,
        )
        result.metadata = result.metadata or {}
        result.metadata["formats_with_previews"] = formats_with_previews

    return result

List supported creative formats.

Args

request
Request parameters
fetch_previews
If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement)
preview_output_format
"url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)

Returns

TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata

async def list_creatives(self, request: ListCreativesRequest) ‑> TaskResult[ListCreativesResponse]
Expand source code
async def list_creatives(
    self,
    request: ListCreativesRequest,
) -> TaskResult[ListCreativesResponse]:
    """
    List Creatives.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ListCreativesResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_creatives",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_creatives(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_creatives",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ListCreativesResponse)

List Creatives.

Args

request
Request parameters

Returns

TaskResult containing ListCreativesResponse

async def list_property_lists(self, request: ListPropertyListsRequest) ‑> TaskResult[ListPropertyListsResponse]
Expand source code
async def list_property_lists(
    self,
    request: ListPropertyListsRequest,
) -> TaskResult[ListPropertyListsResponse]:
    """
    List property lists owned by a principal.

    Retrieves metadata for all property lists, optionally filtered
    by principal or pagination parameters.

    Args:
        request: Request parameters with optional filtering

    Returns:
        TaskResult containing ListPropertyListsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_property_lists",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_property_lists(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_property_lists",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ListPropertyListsResponse)

List property lists owned by a principal.

Retrieves metadata for all property lists, optionally filtered by principal or pagination parameters.

Args

request
Request parameters with optional filtering

Returns

TaskResult containing ListPropertyListsResponse

async def list_tools(self) ‑> list[str]
Expand source code
async def list_tools(self) -> list[str]:
    """
    List available tools from the agent.

    Returns:
        List of tool names
    """
    return await self.adapter.list_tools()

List available tools from the agent.

Returns

List of tool names

async def log_event(self, request: LogEventRequest) ‑> TaskResult[Union[LogEventResponse1, LogEventResponse2]]
Expand source code
async def log_event(
    self,
    request: LogEventRequest,
) -> TaskResult[LogEventResponse]:
    """
    Log Event.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing LogEventResponse
    """
    self._validate_task_features("log_event")
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="log_event",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.log_event(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="log_event",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, LogEventResponse)

Log Event.

Args

request
Request parameters

Returns

TaskResult containing LogEventResponse

async def preview_creative(self, request: PreviewCreativeRequest) ‑> TaskResult[Union[PreviewCreativeResponse1, PreviewCreativeResponse2, PreviewCreativeResponse3]]
Expand source code
async def preview_creative(
    self,
    request: PreviewCreativeRequest,
) -> TaskResult[PreviewCreativeResponse]:
    """
    Generate preview of a creative manifest.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing PreviewCreativeResponse with preview URLs
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="preview_creative",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.preview_creative(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="preview_creative",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, PreviewCreativeResponse)

Generate preview of a creative manifest.

Args

request
Request parameters

Returns

TaskResult containing PreviewCreativeResponse with preview URLs

async def provide_performance_feedback(self, request: ProvidePerformanceFeedbackRequest) ‑> TaskResult[Union[ProvidePerformanceFeedbackResponse1, ProvidePerformanceFeedbackResponse2]]
Expand source code
async def provide_performance_feedback(
    self,
    request: ProvidePerformanceFeedbackRequest,
) -> TaskResult[ProvidePerformanceFeedbackResponse]:
    """
    Provide Performance Feedback.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ProvidePerformanceFeedbackResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="provide_performance_feedback",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.provide_performance_feedback(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="provide_performance_feedback",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse)

Provide Performance Feedback.

Args

request
Request parameters

Returns

TaskResult containing ProvidePerformanceFeedbackResponse

async def refresh_capabilities(self) ‑> GetAdcpCapabilitiesResponse
Expand source code
async def refresh_capabilities(self) -> GetAdcpCapabilitiesResponse:
    """Fetch capabilities from the seller, bypassing cache.

    Returns:
        The seller's capabilities response.
    """
    result = await self.get_adcp_capabilities(GetAdcpCapabilitiesRequest())
    if result.success and result.data is not None:
        self._capabilities = result.data
        self._feature_resolver = FeatureResolver(result.data)
        self._capabilities_fetched_at = time.monotonic()
        return self._capabilities
    raise ADCPError(
        f"Failed to fetch capabilities: {result.error or result.message}",
        agent_id=self.agent_config.id,
        agent_uri=self.agent_config.agent_uri,
    )

Fetch capabilities from the seller, bypassing cache.

Returns

The seller's capabilities response.

async def report_plan_outcome(self, request: ReportPlanOutcomeRequest) ‑> TaskResult[ReportPlanOutcomeResponse]
Expand source code
async def report_plan_outcome(
    self,
    request: ReportPlanOutcomeRequest,
) -> TaskResult[ReportPlanOutcomeResponse]:
    """Report the outcome of a governed action to the governance agent."""
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="report_plan_outcome",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.report_plan_outcome(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="report_plan_outcome",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ReportPlanOutcomeResponse)

Report the outcome of a governed action to the governance agent.

async def report_usage(self, request: ReportUsageRequest) ‑> TaskResult[ReportUsageResponse]
Expand source code
async def report_usage(
    self,
    request: ReportUsageRequest,
) -> TaskResult[ReportUsageResponse]:
    """
    Report Usage.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ReportUsageResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="report_usage",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.report_usage(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="report_usage",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ReportUsageResponse)

Report Usage.

Args

request
Request parameters

Returns

TaskResult containing ReportUsageResponse

def require(self, *features: str) ‑> None
Expand source code
def require(self, *features: str) -> None:
    """Assert that the seller supports all listed features.

    Args:
        *features: Feature identifiers to require.

    Raises:
        ADCPFeatureUnsupportedError: If any features are not supported.
        ADCPError: If capabilities have not been fetched yet.
    """
    self._ensure_resolver().require(
        *features,
        agent_id=self.agent_config.id,
        agent_uri=self.agent_config.agent_uri,
    )

Assert that the seller supports all listed features.

Args

*features
Feature identifiers to require.

Raises

ADCPFeatureUnsupportedError
If any features are not supported.
ADCPError
If capabilities have not been fetched yet.
async def si_get_offering(self, request: SiGetOfferingRequest) ‑> TaskResult[SiGetOfferingResponse]
Expand source code
async def si_get_offering(
    self,
    request: SiGetOfferingRequest,
) -> TaskResult[SiGetOfferingResponse]:
    """
    Get sponsored intelligence offering.

    Retrieves product/service offerings that can be presented in a
    sponsored intelligence session.

    Args:
        request: Request parameters including brand context

    Returns:
        TaskResult containing SiGetOfferingResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_get_offering",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.si_get_offering(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_get_offering",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SiGetOfferingResponse)

Get sponsored intelligence offering.

Retrieves product/service offerings that can be presented in a sponsored intelligence session.

Args

request
Request parameters including brand context

Returns

TaskResult containing SiGetOfferingResponse

async def si_initiate_session(self, request: SiInitiateSessionRequest) ‑> TaskResult[SiInitiateSessionResponse]
Expand source code
async def si_initiate_session(
    self,
    request: SiInitiateSessionRequest,
) -> TaskResult[SiInitiateSessionResponse]:
    """
    Initiate a sponsored intelligence session.

    Starts a conversational brand experience session with a user.

    Args:
        request: Request parameters including identity and context

    Returns:
        TaskResult containing SiInitiateSessionResponse with session_id
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_initiate_session",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.si_initiate_session(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_initiate_session",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SiInitiateSessionResponse)

Initiate a sponsored intelligence session.

Starts a conversational brand experience session with a user.

Args

request
Request parameters including identity and context

Returns

TaskResult containing SiInitiateSessionResponse with session_id

async def si_send_message(self, request: SiSendMessageRequest) ‑> TaskResult[SiSendMessageResponse]
Expand source code
async def si_send_message(
    self,
    request: SiSendMessageRequest,
) -> TaskResult[SiSendMessageResponse]:
    """
    Send a message in a sponsored intelligence session.

    Continues the conversation in an active SI session.

    Args:
        request: Request parameters including session_id and message

    Returns:
        TaskResult containing SiSendMessageResponse with brand response
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_send_message",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.si_send_message(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_send_message",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SiSendMessageResponse)

Send a message in a sponsored intelligence session.

Continues the conversation in an active SI session.

Args

request
Request parameters including session_id and message

Returns

TaskResult containing SiSendMessageResponse with brand response

async def si_terminate_session(self, request: SiTerminateSessionRequest) ‑> TaskResult[SiTerminateSessionResponse]
Expand source code
async def si_terminate_session(
    self,
    request: SiTerminateSessionRequest,
) -> TaskResult[SiTerminateSessionResponse]:
    """
    Terminate a sponsored intelligence session.

    Ends an active SI session, optionally with follow-up actions.

    Args:
        request: Request parameters including session_id and termination context

    Returns:
        TaskResult containing SiTerminateSessionResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_terminate_session",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.si_terminate_session(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_terminate_session",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SiTerminateSessionResponse)

Terminate a sponsored intelligence session.

Ends an active SI session, optionally with follow-up actions.

Args

request
Request parameters including session_id and termination context

Returns

TaskResult containing SiTerminateSessionResponse

def supports(self, feature: str) ‑> bool
Expand source code
def supports(self, feature: str) -> bool:
    """Check if the seller supports a feature.

    Supports multiple feature namespaces:
    - Protocol support: ``supports("media_buy")`` checks ``supported_protocols``
    - Extension support: ``supports("ext:scope3")`` checks ``extensions_supported``
    - Targeting: ``supports("targeting.geo_countries")`` checks
      ``media_buy.execution.targeting``
    - Media buy features: ``supports("audience_targeting")`` checks
      ``media_buy.features``
    - Signals features: ``supports("catalog_signals")`` checks
      ``signals.features``

    Args:
        feature: Feature identifier to check.

    Returns:
        True if the seller declares the feature as supported.

    Raises:
        ADCPError: If capabilities have not been fetched yet.
    """
    return self._ensure_resolver().supports(feature)

Check if the seller supports a feature.

Supports multiple feature namespaces: - Protocol support: supports("media_buy") checks supported_protocols - Extension support: supports("ext:scope3") checks extensions_supported - Targeting: supports("targeting.geo_countries") checks media_buy.execution.targeting - Media buy features: supports("audience_targeting") checks media_buy.features - Signals features: supports("catalog_signals") checks signals.features

Args

feature
Feature identifier to check.

Returns

True if the seller declares the feature as supported.

Raises

ADCPError
If capabilities have not been fetched yet.
async def sync_accounts(self, request: SyncAccountsRequest) ‑> TaskResult[Union[SyncAccountsResponse1, SyncAccountsResponse2]]
Expand source code
async def sync_accounts(
    self,
    request: SyncAccountsRequest,
) -> TaskResult[SyncAccountsResponse]:
    """
    Sync Accounts.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncAccountsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_accounts",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_accounts(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_accounts",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncAccountsResponse)

Sync Accounts.

Args

request
Request parameters

Returns

TaskResult containing SyncAccountsResponse

async def sync_audiences(self, request: SyncAudiencesRequest) ‑> TaskResult[Union[SyncAudiencesResponse1, SyncAudiencesResponse2]]
Expand source code
async def sync_audiences(
    self,
    request: SyncAudiencesRequest,
) -> TaskResult[SyncAudiencesResponse]:
    """
    Sync Audiences.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncAudiencesResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_audiences",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_audiences(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_audiences",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncAudiencesResponse)

Sync Audiences.

Args

request
Request parameters

Returns

TaskResult containing SyncAudiencesResponse

async def sync_catalogs(self, request: SyncCatalogsRequest) ‑> TaskResult[Union[SyncCatalogsResponse1, SyncCatalogsResponse2]]
Expand source code
async def sync_catalogs(
    self,
    request: SyncCatalogsRequest,
) -> TaskResult[SyncCatalogsResponse]:
    """
    Sync Catalogs.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncCatalogsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_catalogs",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_catalogs(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_catalogs",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncCatalogsResponse)

Sync Catalogs.

Args

request
Request parameters

Returns

TaskResult containing SyncCatalogsResponse

async def sync_creatives(self, request: SyncCreativesRequest) ‑> TaskResult[Union[SyncCreativesResponse1, SyncCreativesResponse2, SyncCreativesResponse3]]
Expand source code
async def sync_creatives(
    self,
    request: SyncCreativesRequest,
) -> TaskResult[SyncCreativesResponse]:
    """
    Sync Creatives.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncCreativesResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_creatives",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_creatives(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_creatives",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncCreativesResponse)

Sync Creatives.

Args

request
Request parameters

Returns

TaskResult containing SyncCreativesResponse

async def sync_event_sources(self, request: SyncEventSourcesRequest) ‑> TaskResult[Union[SyncEventSourcesResponse1, SyncEventSourcesResponse2]]
Expand source code
async def sync_event_sources(
    self,
    request: SyncEventSourcesRequest,
) -> TaskResult[SyncEventSourcesResponse]:
    """
    Sync Event Sources.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncEventSourcesResponse
    """
    self._validate_task_features("sync_event_sources")
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_event_sources",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_event_sources(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_event_sources",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncEventSourcesResponse)

Sync Event Sources.

Args

request
Request parameters

Returns

TaskResult containing SyncEventSourcesResponse

async def sync_governance(self, request: SyncGovernanceRequest) ‑> TaskResult[Union[SyncGovernanceResponse1, SyncGovernanceResponse2]]
Expand source code
async def sync_governance(
    self,
    request: SyncGovernanceRequest,
) -> TaskResult[SyncGovernanceResponse]:
    """Sync governance agents attached to an account.

    Attach, detach, or replace the set of governance agents that must be
    consulted for plan approval on an account.

    Args:
        request: Request parameters with account and governance agents

    Returns:
        TaskResult containing SyncGovernanceResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_governance",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_governance(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_governance",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncGovernanceResponse)

Sync governance agents attached to an account.

Attach, detach, or replace the set of governance agents that must be consulted for plan approval on an account.

Args

request
Request parameters with account and governance agents

Returns

TaskResult containing SyncGovernanceResponse

async def sync_plans(self, request: SyncPlansRequest) ‑> TaskResult[SyncPlansResponse]
Expand source code
async def sync_plans(
    self,
    request: SyncPlansRequest,
) -> TaskResult[SyncPlansResponse]:
    """Sync campaign governance plans to the governance agent."""
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_plans",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_plans(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_plans",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncPlansResponse)

Sync campaign governance plans to the governance agent.

async def update_collection_list(self, request: UpdateCollectionListRequest) ‑> TaskResult[UpdateCollectionListResponse]
Expand source code
async def update_collection_list(
    self,
    request: UpdateCollectionListRequest,
) -> TaskResult[UpdateCollectionListResponse]:
    """Update a collection list.

    Args:
        request: Request parameters with list_id and updates

    Returns:
        TaskResult containing UpdateCollectionListResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_collection_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.update_collection_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_collection_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, UpdateCollectionListResponse)

Update a collection list.

Args

request
Request parameters with list_id and updates

Returns

TaskResult containing UpdateCollectionListResponse

async def update_content_standards(self, request: UpdateContentStandardsRequest) ‑> TaskResult[Union[UpdateContentStandardsResponse1, UpdateContentStandardsResponse2]]
Expand source code
async def update_content_standards(
    self,
    request: UpdateContentStandardsRequest,
) -> TaskResult[UpdateContentStandardsResponse]:
    """
    Update a content standards configuration.

    Args:
        request: Request parameters including standards_id and updates

    Returns:
        TaskResult containing UpdateContentStandardsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_content_standards",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.update_content_standards(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_content_standards",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, UpdateContentStandardsResponse)

Update a content standards configuration.

Args

request
Request parameters including standards_id and updates

Returns

TaskResult containing UpdateContentStandardsResponse

async def update_media_buy(self, request: UpdateMediaBuyRequest) ‑> TaskResult[Union[UpdateMediaBuyResponse1, UpdateMediaBuyResponse2]]
Expand source code
async def update_media_buy(
    self,
    request: UpdateMediaBuyRequest,
) -> TaskResult[UpdateMediaBuyResponse]:
    """
    Update an existing media buy reservation.

    Modifies a previously created media buy by updating packages or publisher
    properties. The update operation uses discriminated unions to specify what
    to change - either package details or targeting properties.

    Args:
        request: Media buy update parameters including:
            - media_buy_id: Identifier from create_media_buy response
            - updates: Discriminated union specifying update type:
                * UpdateMediaBuyPackagesRequest: Modify package selections
                * UpdateMediaBuyPropertiesRequest: Change targeting properties

    Returns:
        TaskResult containing UpdateMediaBuyResponse with:
            - media_buy_id: The updated media buy identifier
            - status: Updated state of the media buy
            - packages: Updated package configurations
            - Additional platform-specific metadata

    Example:
        >>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest
        >>> client = ADCPClient(agent_config)
        >>> request = UpdateMediaBuyPackagesRequest(
        ...     media_buy_id="mb_123",
        ...     packages=[updated_package]
        ... )
        >>> result = await client.update_media_buy(request)
        >>> if result.success:
        ...     updated_packages = result.data.packages
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_media_buy",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.update_media_buy(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_media_buy",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, UpdateMediaBuyResponse)

Update an existing media buy reservation.

Modifies a previously created media buy by updating packages or publisher properties. The update operation uses discriminated unions to specify what to change - either package details or targeting properties.

Args

request
Media buy update parameters including: - media_buy_id: Identifier from create_media_buy response - updates: Discriminated union specifying update type: * UpdateMediaBuyPackagesRequest: Modify package selections * UpdateMediaBuyPropertiesRequest: Change targeting properties

Returns

TaskResult containing UpdateMediaBuyResponse with: - media_buy_id: The updated media buy identifier - status: Updated state of the media buy - packages: Updated package configurations - Additional platform-specific metadata

Example

>>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest
>>> client = ADCPClient(agent_config)
>>> request = UpdateMediaBuyPackagesRequest(
...     media_buy_id="mb_123",
...     packages=[updated_package]
... )
>>> result = await client.update_media_buy(request)
>>> if result.success:
...     updated_packages = result.data.packages
async def update_property_list(self, request: UpdatePropertyListRequest) ‑> TaskResult[UpdatePropertyListResponse]
Expand source code
async def update_property_list(
    self,
    request: UpdatePropertyListRequest,
) -> TaskResult[UpdatePropertyListResponse]:
    """
    Update a property list.

    Modifies the filters, brand manifest, or other parameters
    of an existing property list.

    Args:
        request: Request parameters with list_id and updates

    Returns:
        TaskResult containing UpdatePropertyListResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_property_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.update_property_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_property_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, UpdatePropertyListResponse)

Update a property list.

Modifies the filters, brand manifest, or other parameters of an existing property list.

Args

request
Request parameters with list_id and updates

Returns

TaskResult containing UpdatePropertyListResponse

async def update_rights(self, request: UpdateRightsRequest) ‑> TaskResult[Union[UpdateRightsResponse1, UpdateRightsResponse2]]
Expand source code
async def update_rights(
    self,
    request: UpdateRightsRequest,
) -> TaskResult[UpdateRightsResponse]:
    """Update terms of an existing rights acquisition.

    Modifies a previously acquired rights record — typically to extend
    the ``end_date``, raise the ``impression_cap``, pause/unpause via
    ``paused``, or swap to a compatible ``pricing_option_id``. Partial
    update: pass only the fields you want to change.

    Failure modes (surface as ``TaskResult`` with ``success=False``):

    * Acquisition is expired or revoked — the seller rejects the update
      outright; mint a fresh ``acquire_rights`` instead.
    * ``pricing_option_id`` swap to an incompatible option — rejected;
      the new option's terms must be a strict superset / compatible
      with the original acquisition.
    * No partial-state mutations on rejection: the acquisition remains
      at its prior state when any field fails validation.

    Args:
        request: Request with ``rights_id`` and at least one mutable
            field (``end_date``, ``impression_cap``, ``paused``, or
            ``pricing_option_id``).

    Returns:
        TaskResult containing UpdateRightsResponse (updated or error).
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_rights",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.update_rights(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_rights",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, UpdateRightsResponse)

Update terms of an existing rights acquisition.

Modifies a previously acquired rights record — typically to extend the end_date, raise the impression_cap, pause/unpause via paused, or swap to a compatible pricing_option_id. Partial update: pass only the fields you want to change.

Failure modes (surface as TaskResult with success=False):

  • Acquisition is expired or revoked — the seller rejects the update outright; mint a fresh acquire_rights instead.
  • pricing_option_id swap to an incompatible option — rejected; the new option's terms must be a strict superset / compatible with the original acquisition.
  • No partial-state mutations on rejection: the acquisition remains at its prior state when any field fails validation.

Args

request
Request with rights_id and at least one mutable field (end_date, impression_cap, paused, or pricing_option_id).

Returns

TaskResult containing UpdateRightsResponse (updated or error).

def use_idempotency_key(self, key: str) ‑> Iterator[str]
Expand source code
@contextlib.contextmanager
def use_idempotency_key(self, key: str) -> Iterator[str]:
    """Pin an ``idempotency_key`` for the next mutating call on THIS client.

    Use when you've persisted a key (e.g., in a buyer-side database) and
    want the SDK to send that exact key on resume or retry across process
    restarts. The key is validated against ``^[A-Za-z0-9_.:-]{16,255}$`` on
    entry; a ``ValueError`` is raised for malformed keys.

    Scope rules:

    * **Single-use within scope.** The first mutating call inside the
      ``with`` block consumes the pinned key; a second mutating call falls
      through to a fresh UUID. This protects against ``asyncio.gather``
      siblings accidentally sharing the key (which would trigger
      ``IDEMPOTENCY_CONFLICT`` or silently duplicate work). If you need to
      retry, wrap each attempt in its own ``with`` block.
    * **Client-scoped.** The pinned key applies only to calls on THIS
      client. A mutating call on a sibling ``ADCPClient`` inside the same
      ``with`` block generates a fresh key and emits a ``UserWarning`` —
      keys must be unique per (seller, request) pair (AdCP #2315).
    * **No nesting.** Nested ``use_idempotency_key`` on the same client
      raises ``RuntimeError``.

    Example::

        with client.use_idempotency_key(campaign.stored_key):
            result = await client.create_media_buy(request)
    """
    from adcp import _idempotency

    _idempotency.validate_key(key)
    token = self._idempotency_client_token
    if token in _idempotency._scoped_keys:
        raise RuntimeError(
            "use_idempotency_key is already active on this client; "
            "nested usage is not supported."
        )
    _idempotency._scoped_keys[token] = key
    try:
        yield key
    finally:
        _idempotency._scoped_keys.pop(token, None)

Pin an idempotency_key for the next mutating call on THIS client.

Use when you've persisted a key (e.g., in a buyer-side database) and want the SDK to send that exact key on resume or retry across process restarts. The key is validated against ^[A-Za-z0-9_.:-]{16,255}$ on entry; a ValueError is raised for malformed keys.

Scope rules:

  • Single-use within scope. The first mutating call inside the with block consumes the pinned key; a second mutating call falls through to a fresh UUID. This protects against asyncio.gather siblings accidentally sharing the key (which would trigger IDEMPOTENCY_CONFLICT or silently duplicate work). If you need to retry, wrap each attempt in its own with block.
  • Client-scoped. The pinned key applies only to calls on THIS client. A mutating call on a sibling ADCPClient inside the same with block generates a fresh key and emits a UserWarning — keys must be unique per (seller, request) pair (AdCP #2315).
  • No nesting. Nested use_idempotency_key on the same client raises RuntimeError.

Example::

with client.use_idempotency_key(campaign.stored_key):
    result = await client.create_media_buy(request)
async def validate_content_delivery(self, request: ValidateContentDeliveryRequest) ‑> TaskResult[Union[ValidateContentDeliveryResponse1, ValidateContentDeliveryResponse2]]
Expand source code
async def validate_content_delivery(
    self,
    request: ValidateContentDeliveryRequest,
) -> TaskResult[ValidateContentDeliveryResponse]:
    """
    Validate content delivery against standards.

    Validates that ad delivery records comply with content standards.

    Args:
        request: Request parameters including delivery records

    Returns:
        TaskResult containing ValidateContentDeliveryResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(mode="json", exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="validate_content_delivery",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.validate_content_delivery(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="validate_content_delivery",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ValidateContentDeliveryResponse)

Validate content delivery against standards.

Validates that ad delivery records comply with content standards.

Args

request
Request parameters including delivery records

Returns

TaskResult containing ValidateContentDeliveryResponse

class ADCPMultiAgentClient (agents: list[AgentConfig],
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
handlers: dict[str, Callable[..., Any]] | None = None,
signing: SigningConfig | None = None)
Expand source code
class ADCPMultiAgentClient:
    """Client for managing multiple AdCP agents."""

    def __init__(
        self,
        agents: list[AgentConfig],
        webhook_url_template: str | None = None,
        webhook_secret: str | None = None,
        on_activity: Callable[[Activity], None] | None = None,
        handlers: dict[str, Callable[..., Any]] | None = None,
        signing: SigningConfig | None = None,
    ):
        """
        Initialize multi-agent client.

        Args:
            agents: List of agent configurations
            webhook_url_template: Template for webhook URLs
            webhook_secret: Secret for webhook verification
            on_activity: Callback for activity events
            handlers: Task completion handlers
            signing: Optional RFC 9421 signing config forwarded to every
                per-agent ADCPClient. The same identity signs traffic to
                all agents. See ADCPClient.__init__ for details.
        """
        self.agents = {
            agent.id: ADCPClient(
                agent,
                webhook_url_template=webhook_url_template,
                webhook_secret=webhook_secret,
                on_activity=on_activity,
                signing=signing,
            )
            for agent in agents
        }
        self.handlers = handlers or {}

    def agent(self, agent_id: str) -> ADCPClient:
        """Get client for specific agent."""
        if agent_id not in self.agents:
            raise ValueError(f"Agent not found: {agent_id}")
        return self.agents[agent_id]

    @property
    def agent_ids(self) -> list[str]:
        """Get list of agent IDs."""
        return list(self.agents.keys())

    async def close(self) -> None:
        """Close all agent clients and clean up resources."""
        import asyncio

        logger.debug("Closing all agent clients in multi-agent client")
        close_tasks = [client.close() for client in self.agents.values()]
        await asyncio.gather(*close_tasks, return_exceptions=True)

    async def __aenter__(self) -> ADCPMultiAgentClient:
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        await self.close()

    async def get_products(
        self,
        request: GetProductsRequest,
    ) -> list[TaskResult[GetProductsResponse]]:
        """
        Execute get_products across all agents in parallel.

        Args:
            request: Request parameters

        Returns:
            List of TaskResults containing GetProductsResponse for each agent
        """
        import asyncio

        tasks = [agent.get_products(request) for agent in self.agents.values()]
        return await asyncio.gather(*tasks)

    @classmethod
    def from_env(cls) -> ADCPMultiAgentClient:
        """Create client from environment variables."""
        agents_json = os.getenv("ADCP_AGENTS")
        if not agents_json:
            raise ValueError("ADCP_AGENTS environment variable not set")

        agents_data = json.loads(agents_json)
        agents = [AgentConfig(**agent) for agent in agents_data]

        return cls(
            agents=agents,
            webhook_url_template=os.getenv("WEBHOOK_URL_TEMPLATE"),
            webhook_secret=os.getenv("WEBHOOK_SECRET"),
        )

Client for managing multiple AdCP agents.

Initialize multi-agent client.

Args

agents
List of agent configurations
webhook_url_template
Template for webhook URLs
webhook_secret
Secret for webhook verification
on_activity
Callback for activity events
handlers
Task completion handlers
signing
Optional RFC 9421 signing config forwarded to every per-agent ADCPClient. The same identity signs traffic to all agents. See ADCPClient.init for details.

Static methods

def from_env() ‑> ADCPMultiAgentClient

Create client from environment variables.

Instance variables

prop agent_ids : list[str]
Expand source code
@property
def agent_ids(self) -> list[str]:
    """Get list of agent IDs."""
    return list(self.agents.keys())

Get list of agent IDs.

Methods

def agent(self, agent_id: str) ‑> ADCPClient
Expand source code
def agent(self, agent_id: str) -> ADCPClient:
    """Get client for specific agent."""
    if agent_id not in self.agents:
        raise ValueError(f"Agent not found: {agent_id}")
    return self.agents[agent_id]

Get client for specific agent.

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close all agent clients and clean up resources."""
    import asyncio

    logger.debug("Closing all agent clients in multi-agent client")
    close_tasks = [client.close() for client in self.agents.values()]
    await asyncio.gather(*close_tasks, return_exceptions=True)

Close all agent clients and clean up resources.

async def get_products(self, request: GetProductsRequest) ‑> list[TaskResult[GetProductsResponse]]
Expand source code
async def get_products(
    self,
    request: GetProductsRequest,
) -> list[TaskResult[GetProductsResponse]]:
    """
    Execute get_products across all agents in parallel.

    Args:
        request: Request parameters

    Returns:
        List of TaskResults containing GetProductsResponse for each agent
    """
    import asyncio

    tasks = [agent.get_products(request) for agent in self.agents.values()]
    return await asyncio.gather(*tasks)

Execute get_products across all agents in parallel.

Args

request
Request parameters

Returns

List of TaskResults containing GetProductsResponse for each agent