Module adcp.client

Classes

class ADCPClient (agent_config: AgentConfig,
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
webhook_timestamp_tolerance: int = 300,
capabilities_ttl: float = 3600.0,
validate_features: bool = False)
Expand source code
class ADCPClient:
    """Client for interacting with a single AdCP agent."""

    def __init__(
        self,
        agent_config: AgentConfig,
        webhook_url_template: str | None = None,
        webhook_secret: str | None = None,
        on_activity: Callable[[Activity], None] | None = None,
        webhook_timestamp_tolerance: int = 300,
        capabilities_ttl: float = 3600.0,
        validate_features: bool = False,
    ):
        """
        Initialize ADCP client for a single agent.

        Args:
            agent_config: Agent configuration
            webhook_url_template: Template for webhook URLs with {agent_id},
                {task_type}, {operation_id}
            webhook_secret: Secret for webhook signature verification
            on_activity: Callback for activity events
            webhook_timestamp_tolerance: Maximum age (in seconds) for webhook
                timestamps. Webhooks with timestamps older than this or more than
                this far in the future are rejected. Defaults to 300 (5 minutes).
            capabilities_ttl: Time-to-live in seconds for cached capabilities (default: 1 hour)
            validate_features: When True, automatically check that the seller supports
                required features before making task calls (e.g., sync_audiences requires
                audience_targeting). Requires capabilities to have been fetched first.
        """
        self.agent_config = agent_config
        self.webhook_url_template = webhook_url_template
        self.webhook_secret = webhook_secret
        self.on_activity = on_activity
        self.webhook_timestamp_tolerance = webhook_timestamp_tolerance
        self.capabilities_ttl = capabilities_ttl
        self.validate_features = validate_features

        # Capabilities cache
        self._capabilities: GetAdcpCapabilitiesResponse | None = None
        self._feature_resolver: FeatureResolver | None = None
        self._capabilities_fetched_at: float | None = None

        # Initialize protocol adapter
        self.adapter: ProtocolAdapter
        if agent_config.protocol == Protocol.A2A:
            self.adapter = A2AAdapter(agent_config)
        elif agent_config.protocol == Protocol.MCP:
            self.adapter = MCPAdapter(agent_config)
        else:
            raise ValueError(f"Unsupported protocol: {agent_config.protocol}")

        # Initialize simple API accessor (lazy import to avoid circular dependency)
        from adcp.simple import SimpleAPI

        self.simple = SimpleAPI(self)

    def get_webhook_url(self, task_type: str, operation_id: str) -> str:
        """Generate webhook URL for a task."""
        if not self.webhook_url_template:
            raise ValueError("webhook_url_template not configured")

        return self.webhook_url_template.format(
            agent_id=self.agent_config.id,
            task_type=task_type,
            operation_id=operation_id,
        )

    def _emit_activity(self, activity: Activity) -> None:
        """Emit activity event."""
        if self.on_activity:
            self.on_activity(activity)

    # ========================================================================
    # Capability Validation
    # ========================================================================

    @property
    def capabilities(self) -> GetAdcpCapabilitiesResponse | None:
        """Return cached capabilities, or None if not yet fetched."""
        return self._capabilities

    @property
    def feature_resolver(self) -> FeatureResolver | None:
        """Return the FeatureResolver for cached capabilities, or None."""
        return self._feature_resolver

    async def fetch_capabilities(self) -> GetAdcpCapabilitiesResponse:
        """Fetch capabilities, using cache if still valid.

        Returns:
            The seller's capabilities response.
        """
        if self._capabilities is not None and self._capabilities_fetched_at is not None:
            elapsed = time.monotonic() - self._capabilities_fetched_at
            if elapsed < self.capabilities_ttl:
                return self._capabilities

        return await self.refresh_capabilities()

    async def refresh_capabilities(self) -> GetAdcpCapabilitiesResponse:
        """Fetch capabilities from the seller, bypassing cache.

        Returns:
            The seller's capabilities response.
        """
        result = await self.get_adcp_capabilities(GetAdcpCapabilitiesRequest())
        if result.success and result.data is not None:
            self._capabilities = result.data
            self._feature_resolver = FeatureResolver(result.data)
            self._capabilities_fetched_at = time.monotonic()
            return self._capabilities
        raise ADCPError(
            f"Failed to fetch capabilities: {result.error or result.message}",
            agent_id=self.agent_config.id,
            agent_uri=self.agent_config.agent_uri,
        )

    def _ensure_resolver(self) -> FeatureResolver:
        """Return the FeatureResolver, raising if capabilities haven't been fetched."""
        if self._feature_resolver is None:
            raise ADCPError(
                "Cannot check feature support: capabilities have not been fetched. "
                "Call fetch_capabilities() first.",
                agent_id=self.agent_config.id,
                agent_uri=self.agent_config.agent_uri,
            )
        return self._feature_resolver

    def supports(self, feature: str) -> bool:
        """Check if the seller supports a feature.

        Supports multiple feature namespaces:
        - Protocol support: ``supports("media_buy")`` checks ``supported_protocols``
        - Extension support: ``supports("ext:scope3")`` checks ``extensions_supported``
        - Targeting: ``supports("targeting.geo_countries")`` checks
          ``media_buy.execution.targeting``
        - Media buy features: ``supports("audience_targeting")`` checks
          ``media_buy.features``
        - Signals features: ``supports("catalog_signals")`` checks
          ``signals.features``

        Args:
            feature: Feature identifier to check.

        Returns:
            True if the seller declares the feature as supported.

        Raises:
            ADCPError: If capabilities have not been fetched yet.
        """
        return self._ensure_resolver().supports(feature)

    def require(self, *features: str) -> None:
        """Assert that the seller supports all listed features.

        Args:
            *features: Feature identifiers to require.

        Raises:
            ADCPFeatureUnsupportedError: If any features are not supported.
            ADCPError: If capabilities have not been fetched yet.
        """
        self._ensure_resolver().require(
            *features,
            agent_id=self.agent_config.id,
            agent_uri=self.agent_config.agent_uri,
        )

    def _validate_task_features(self, task_name: str) -> None:
        """Check feature requirements for a task if validate_features is enabled.

        Returns without checking if validate_features is False or capabilities
        haven't been fetched yet (logs a warning in the latter case).
        """
        if not self.validate_features:
            return
        if self._feature_resolver is None:
            logger.warning(
                "validate_features is enabled but capabilities have not been fetched. "
                "Call fetch_capabilities() to enable feature validation."
            )
            return
        required_feature = TASK_FEATURE_MAP.get(task_name)
        if required_feature is None:
            return
        self.require(required_feature)

    async def get_products(
        self,
        request: GetProductsRequest,
        fetch_previews: bool = False,
        preview_output_format: str = "url",
        creative_agent_client: ADCPClient | None = None,
    ) -> TaskResult[GetProductsResponse]:
        """
        Get advertising products.

        Args:
            request: Request parameters
            fetch_previews: If True, generate preview URLs for each product's formats
                (uses batch API for 5-10x performance improvement)
            preview_output_format: "url" for iframe URLs (default), "html" for direct
                embedding (2-3x faster, no iframe overhead)
            creative_agent_client: Client for creative agent (required if
                fetch_previews=True)

        Returns:
            TaskResult containing GetProductsResponse with optional preview URLs in metadata

        Raises:
            ValueError: If fetch_previews=True but creative_agent_client is not provided
        """
        if fetch_previews and not creative_agent_client:
            raise ValueError("creative_agent_client is required when fetch_previews=True")

        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_products",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_products(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_products",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        result: TaskResult[GetProductsResponse] = self.adapter._parse_response(
            raw_result, GetProductsResponse
        )

        if fetch_previews and result.success and result.data and creative_agent_client:
            from adcp.utils.preview_cache import add_preview_urls_to_products

            products_with_previews = await add_preview_urls_to_products(
                result.data.products,
                creative_agent_client,
                use_batch=True,
                output_format=preview_output_format,
            )
            result.metadata = result.metadata or {}
            result.metadata["products_with_previews"] = products_with_previews

        return result

    async def list_creative_formats(
        self,
        request: ListCreativeFormatsRequest,
        fetch_previews: bool = False,
        preview_output_format: str = "url",
    ) -> TaskResult[ListCreativeFormatsResponse]:
        """
        List supported creative formats.

        Args:
            request: Request parameters
            fetch_previews: If True, generate preview URLs for each format using
                sample manifests (uses batch API for 5-10x performance improvement)
            preview_output_format: "url" for iframe URLs (default), "html" for direct
                embedding (2-3x faster, no iframe overhead)

        Returns:
            TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_creative_formats",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_creative_formats(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_creative_formats",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response(
            raw_result, ListCreativeFormatsResponse
        )

        if fetch_previews and result.success and result.data:
            from adcp.utils.preview_cache import add_preview_urls_to_formats

            formats_with_previews = await add_preview_urls_to_formats(
                result.data.formats,
                self,
                use_batch=True,
                output_format=preview_output_format,
            )
            result.metadata = result.metadata or {}
            result.metadata["formats_with_previews"] = formats_with_previews

        return result

    async def preview_creative(
        self,
        request: PreviewCreativeRequest,
    ) -> TaskResult[PreviewCreativeResponse]:
        """
        Generate preview of a creative manifest.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing PreviewCreativeResponse with preview URLs
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="preview_creative",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.preview_creative(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="preview_creative",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, PreviewCreativeResponse)

    async def sync_creatives(
        self,
        request: SyncCreativesRequest,
    ) -> TaskResult[SyncCreativesResponse]:
        """
        Sync Creatives.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncCreativesResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_creatives",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_creatives(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_creatives",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncCreativesResponse)

    async def list_creatives(
        self,
        request: ListCreativesRequest,
    ) -> TaskResult[ListCreativesResponse]:
        """
        List Creatives.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ListCreativesResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_creatives",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_creatives(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_creatives",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ListCreativesResponse)

    async def get_media_buy_delivery(
        self,
        request: GetMediaBuyDeliveryRequest,
    ) -> TaskResult[GetMediaBuyDeliveryResponse]:
        """
        Get Media Buy Delivery.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetMediaBuyDeliveryResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buy_delivery",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_media_buy_delivery(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buy_delivery",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse)

    async def get_media_buys(
        self,
        request: GetMediaBuysRequest,
    ) -> TaskResult[GetMediaBuysResponse]:
        """
        Get Media Buys.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetMediaBuysResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buys",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_media_buys(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buys",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetMediaBuysResponse)

    async def get_signals(
        self,
        request: GetSignalsRequest,
    ) -> TaskResult[GetSignalsResponse]:
        """
        Get Signals.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetSignalsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_signals",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_signals(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_signals",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetSignalsResponse)

    async def activate_signal(
        self,
        request: ActivateSignalRequest,
    ) -> TaskResult[ActivateSignalResponse]:
        """
        Activate Signal.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ActivateSignalResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="activate_signal",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.activate_signal(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="activate_signal",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ActivateSignalResponse)

    async def provide_performance_feedback(
        self,
        request: ProvidePerformanceFeedbackRequest,
    ) -> TaskResult[ProvidePerformanceFeedbackResponse]:
        """
        Provide Performance Feedback.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ProvidePerformanceFeedbackResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="provide_performance_feedback",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.provide_performance_feedback(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="provide_performance_feedback",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse)

    async def create_media_buy(
        self,
        request: CreateMediaBuyRequest,
    ) -> TaskResult[CreateMediaBuyResponse]:
        """
        Create a new media buy reservation.

        Requests the agent to reserve inventory for a campaign. The agent returns a
        media_buy_id that tracks this reservation and can be used for updates.

        Args:
            request: Media buy creation parameters including:
                - brand_manifest: Advertiser brand information and creative assets
                - packages: List of package requests specifying desired inventory
                - publisher_properties: Target properties for ad placement
                - budget: Optional budget constraints
                - start_date/end_date: Campaign flight dates

        Returns:
            TaskResult containing CreateMediaBuyResponse with:
                - media_buy_id: Unique identifier for this reservation
                - status: Current state of the media buy
                - packages: Confirmed package details
                - Additional platform-specific metadata

        Example:
            >>> from adcp import ADCPClient, CreateMediaBuyRequest
            >>> client = ADCPClient(agent_config)
            >>> request = CreateMediaBuyRequest(
            ...     brand_manifest=brand,
            ...     packages=[package_request],
            ...     publisher_properties=properties
            ... )
            >>> result = await client.create_media_buy(request)
            >>> if result.success:
            ...     media_buy_id = result.data.media_buy_id
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_media_buy",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.create_media_buy(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_media_buy",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CreateMediaBuyResponse)

    async def update_media_buy(
        self,
        request: UpdateMediaBuyRequest,
    ) -> TaskResult[UpdateMediaBuyResponse]:
        """
        Update an existing media buy reservation.

        Modifies a previously created media buy by updating packages or publisher
        properties. The update operation uses discriminated unions to specify what
        to change - either package details or targeting properties.

        Args:
            request: Media buy update parameters including:
                - media_buy_id: Identifier from create_media_buy response
                - updates: Discriminated union specifying update type:
                    * UpdateMediaBuyPackagesRequest: Modify package selections
                    * UpdateMediaBuyPropertiesRequest: Change targeting properties

        Returns:
            TaskResult containing UpdateMediaBuyResponse with:
                - media_buy_id: The updated media buy identifier
                - status: Updated state of the media buy
                - packages: Updated package configurations
                - Additional platform-specific metadata

        Example:
            >>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest
            >>> client = ADCPClient(agent_config)
            >>> request = UpdateMediaBuyPackagesRequest(
            ...     media_buy_id="mb_123",
            ...     packages=[updated_package]
            ... )
            >>> result = await client.update_media_buy(request)
            >>> if result.success:
            ...     updated_packages = result.data.packages
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_media_buy",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.update_media_buy(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_media_buy",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, UpdateMediaBuyResponse)

    async def build_creative(
        self,
        request: BuildCreativeRequest,
    ) -> TaskResult[BuildCreativeResponse]:
        """
        Generate production-ready creative assets.

        Requests the creative agent to build final deliverable assets in the target
        format (e.g., VAST, DAAST, HTML5). This is typically called after previewing
        and approving a creative manifest.

        Args:
            request: Creative build parameters including:
                - manifest: Creative manifest with brand info and content
                - target_format_id: Desired output format identifier
                - inputs: Optional user-provided inputs for template variables
                - deployment: Platform or agent deployment configuration

        Returns:
            TaskResult containing BuildCreativeResponse with:
                - assets: Production-ready creative files (URLs or inline content)
                - format_id: The generated format identifier
                - manifest: The creative manifest used for generation
                - metadata: Additional platform-specific details

        Example:
            >>> from adcp import ADCPClient, BuildCreativeRequest
            >>> client = ADCPClient(agent_config)
            >>> request = BuildCreativeRequest(
            ...     manifest=creative_manifest,
            ...     target_format_id="vast_2.0",
            ...     inputs={"duration": 30}
            ... )
            >>> result = await client.build_creative(request)
            >>> if result.success:
            ...     vast_url = result.data.assets[0].url
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="build_creative",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.build_creative(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="build_creative",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, BuildCreativeResponse)

    async def list_accounts(
        self,
        request: ListAccountsRequest,
    ) -> TaskResult[ListAccountsResponse]:
        """
        List Accounts.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ListAccountsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_accounts",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_accounts(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_accounts",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ListAccountsResponse)

    async def sync_accounts(
        self,
        request: SyncAccountsRequest,
    ) -> TaskResult[SyncAccountsResponse]:
        """
        Sync Accounts.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncAccountsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_accounts",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_accounts(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_accounts",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncAccountsResponse)

    async def get_account_financials(
        self,
        request: GetAccountFinancialsRequest,
    ) -> TaskResult[GetAccountFinancialsResponse]:
        """
        Get Account Financials.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetAccountFinancialsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_account_financials",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_account_financials(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_account_financials",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetAccountFinancialsResponse)

    async def report_usage(
        self,
        request: ReportUsageRequest,
    ) -> TaskResult[ReportUsageResponse]:
        """
        Report Usage.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing ReportUsageResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="report_usage",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.report_usage(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="report_usage",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ReportUsageResponse)

    async def log_event(
        self,
        request: LogEventRequest,
    ) -> TaskResult[LogEventResponse]:
        """
        Log Event.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing LogEventResponse
        """
        self._validate_task_features("log_event")
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="log_event",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.log_event(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="log_event",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, LogEventResponse)

    async def sync_event_sources(
        self,
        request: SyncEventSourcesRequest,
    ) -> TaskResult[SyncEventSourcesResponse]:
        """
        Sync Event Sources.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncEventSourcesResponse
        """
        self._validate_task_features("sync_event_sources")
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_event_sources",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_event_sources(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_event_sources",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncEventSourcesResponse)

    async def sync_audiences(
        self,
        request: SyncAudiencesRequest,
    ) -> TaskResult[SyncAudiencesResponse]:
        """
        Sync Audiences.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncAudiencesResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_audiences",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_audiences(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_audiences",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncAudiencesResponse)

    async def sync_catalogs(
        self,
        request: SyncCatalogsRequest,
    ) -> TaskResult[SyncCatalogsResponse]:
        """
        Sync Catalogs.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing SyncCatalogsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_catalogs",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_catalogs(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_catalogs",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncCatalogsResponse)

    async def get_creative_delivery(
        self,
        request: GetCreativeDeliveryRequest,
    ) -> TaskResult[GetCreativeDeliveryResponse]:
        """
        Get Creative Delivery.

        Args:
            request: Request parameters

        Returns:
            TaskResult containing GetCreativeDeliveryResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_creative_delivery",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_creative_delivery(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_creative_delivery",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetCreativeDeliveryResponse)

    # ========================================================================
    # V3 Protocol Methods - Protocol Discovery
    # ========================================================================

    async def get_adcp_capabilities(
        self,
        request: GetAdcpCapabilitiesRequest,
    ) -> TaskResult[GetAdcpCapabilitiesResponse]:
        """
        Get AdCP capabilities from the agent.

        Queries the agent's supported AdCP features, protocol versions, and
        domain-specific capabilities (media_buy, signals, sponsored_intelligence).

        Args:
            request: Request parameters including optional protocol filters

        Returns:
            TaskResult containing GetAdcpCapabilitiesResponse with:
                - adcp: Core protocol version information
                - supported_protocols: List of supported domain protocols
                - media_buy: Media buy capabilities (if supported)
                - sponsored_intelligence: SI capabilities (if supported)
                - signals: Signals capabilities (if supported)
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_adcp_capabilities",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_adcp_capabilities(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_adcp_capabilities",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetAdcpCapabilitiesResponse)

    # ========================================================================
    # V3 Protocol Methods - Content Standards
    # ========================================================================

    async def create_content_standards(
        self,
        request: CreateContentStandardsRequest,
    ) -> TaskResult[CreateContentStandardsResponse]:
        """
        Create a new content standards configuration.

        Defines acceptable content contexts for ad placement using natural
        language policy and optional calibration exemplars.

        Args:
            request: Request parameters including policy and scope

        Returns:
            TaskResult containing CreateContentStandardsResponse with standards_id
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_content_standards",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.create_content_standards(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_content_standards",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CreateContentStandardsResponse)

    async def get_content_standards(
        self,
        request: GetContentStandardsRequest,
    ) -> TaskResult[GetContentStandardsResponse]:
        """
        Get a content standards configuration by ID.

        Args:
            request: Request parameters including standards_id

        Returns:
            TaskResult containing GetContentStandardsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_content_standards",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_content_standards(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_content_standards",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetContentStandardsResponse)

    async def list_content_standards(
        self,
        request: ListContentStandardsRequest,
    ) -> TaskResult[ListContentStandardsResponse]:
        """
        List content standards configurations.

        Args:
            request: Request parameters including optional filters

        Returns:
            TaskResult containing ListContentStandardsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_content_standards",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_content_standards(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_content_standards",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ListContentStandardsResponse)

    async def update_content_standards(
        self,
        request: UpdateContentStandardsRequest,
    ) -> TaskResult[UpdateContentStandardsResponse]:
        """
        Update a content standards configuration.

        Args:
            request: Request parameters including standards_id and updates

        Returns:
            TaskResult containing UpdateContentStandardsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_content_standards",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.update_content_standards(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_content_standards",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, UpdateContentStandardsResponse)

    async def calibrate_content(
        self,
        request: CalibrateContentRequest,
    ) -> TaskResult[CalibrateContentResponse]:
        """
        Calibrate content against standards.

        Evaluates content (artifact or URL) against configured standards to
        determine suitability for ad placement.

        Args:
            request: Request parameters including content to evaluate

        Returns:
            TaskResult containing CalibrateContentResponse with verdict
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="calibrate_content",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.calibrate_content(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="calibrate_content",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CalibrateContentResponse)

    async def validate_content_delivery(
        self,
        request: ValidateContentDeliveryRequest,
    ) -> TaskResult[ValidateContentDeliveryResponse]:
        """
        Validate content delivery against standards.

        Validates that ad delivery records comply with content standards.

        Args:
            request: Request parameters including delivery records

        Returns:
            TaskResult containing ValidateContentDeliveryResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="validate_content_delivery",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.validate_content_delivery(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="validate_content_delivery",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ValidateContentDeliveryResponse)

    async def get_media_buy_artifacts(
        self,
        request: GetMediaBuyArtifactsRequest,
    ) -> TaskResult[GetMediaBuyArtifactsResponse]:
        """
        Get artifacts associated with a media buy.

        Retrieves content artifacts where ads were delivered for a media buy.

        Args:
            request: Request parameters including media_buy_id

        Returns:
            TaskResult containing GetMediaBuyArtifactsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buy_artifacts",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_media_buy_artifacts(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_media_buy_artifacts",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetMediaBuyArtifactsResponse)

    # ========================================================================
    # V3 Protocol Methods - Sponsored Intelligence
    # ========================================================================

    async def si_get_offering(
        self,
        request: SiGetOfferingRequest,
    ) -> TaskResult[SiGetOfferingResponse]:
        """
        Get sponsored intelligence offering.

        Retrieves product/service offerings that can be presented in a
        sponsored intelligence session.

        Args:
            request: Request parameters including brand context

        Returns:
            TaskResult containing SiGetOfferingResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_get_offering",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.si_get_offering(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_get_offering",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SiGetOfferingResponse)

    async def si_initiate_session(
        self,
        request: SiInitiateSessionRequest,
    ) -> TaskResult[SiInitiateSessionResponse]:
        """
        Initiate a sponsored intelligence session.

        Starts a conversational brand experience session with a user.

        Args:
            request: Request parameters including identity and context

        Returns:
            TaskResult containing SiInitiateSessionResponse with session_id
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_initiate_session",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.si_initiate_session(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_initiate_session",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SiInitiateSessionResponse)

    async def si_send_message(
        self,
        request: SiSendMessageRequest,
    ) -> TaskResult[SiSendMessageResponse]:
        """
        Send a message in a sponsored intelligence session.

        Continues the conversation in an active SI session.

        Args:
            request: Request parameters including session_id and message

        Returns:
            TaskResult containing SiSendMessageResponse with brand response
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_send_message",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.si_send_message(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_send_message",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SiSendMessageResponse)

    async def si_terminate_session(
        self,
        request: SiTerminateSessionRequest,
    ) -> TaskResult[SiTerminateSessionResponse]:
        """
        Terminate a sponsored intelligence session.

        Ends an active SI session, optionally with follow-up actions.

        Args:
            request: Request parameters including session_id and termination context

        Returns:
            TaskResult containing SiTerminateSessionResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_terminate_session",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.si_terminate_session(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="si_terminate_session",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SiTerminateSessionResponse)

    # ========================================================================
    # V3 Governance Methods
    # ========================================================================

    async def get_creative_features(
        self,
        request: GetCreativeFeaturesRequest,
    ) -> TaskResult[GetCreativeFeaturesResponse]:
        """Evaluate governance features for a creative manifest."""
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_creative_features",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_creative_features(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_creative_features",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetCreativeFeaturesResponse)

    async def sync_plans(
        self,
        request: SyncPlansRequest,
    ) -> TaskResult[SyncPlansResponse]:
        """Sync campaign governance plans to the governance agent."""
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_plans",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.sync_plans(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="sync_plans",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, SyncPlansResponse)

    async def check_governance(
        self,
        request: CheckGovernanceRequest,
    ) -> TaskResult[CheckGovernanceResponse]:
        """Check a proposed or committed action against campaign governance."""
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="check_governance",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.check_governance(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="check_governance",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CheckGovernanceResponse)

    async def report_plan_outcome(
        self,
        request: ReportPlanOutcomeRequest,
    ) -> TaskResult[ReportPlanOutcomeResponse]:
        """Report the outcome of a governed action to the governance agent."""
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="report_plan_outcome",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.report_plan_outcome(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="report_plan_outcome",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ReportPlanOutcomeResponse)

    async def get_plan_audit_logs(
        self,
        request: GetPlanAuditLogsRequest,
    ) -> TaskResult[GetPlanAuditLogsResponse]:
        """Retrieve governance state and audit logs for one or more plans."""
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_plan_audit_logs",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_plan_audit_logs(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_plan_audit_logs",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetPlanAuditLogsResponse)

    async def create_property_list(
        self,
        request: CreatePropertyListRequest,
    ) -> TaskResult[CreatePropertyListResponse]:
        """
        Create a property list for governance filtering.

        Property lists define dynamic sets of properties based on filters,
        brand manifests, and feature requirements.

        Args:
            request: Request parameters for creating the property list

        Returns:
            TaskResult containing CreatePropertyListResponse with list_id
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_property_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.create_property_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="create_property_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, CreatePropertyListResponse)

    async def get_property_list(
        self,
        request: GetPropertyListRequest,
    ) -> TaskResult[GetPropertyListResponse]:
        """
        Get a property list with optional resolution.

        When resolve=true, returns the list of resolved property identifiers.
        Use this to get the actual properties that match the list's filters.

        Args:
            request: Request parameters including list_id and resolve flag

        Returns:
            TaskResult containing GetPropertyListResponse with identifiers
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_property_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.get_property_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="get_property_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, GetPropertyListResponse)

    async def list_property_lists(
        self,
        request: ListPropertyListsRequest,
    ) -> TaskResult[ListPropertyListsResponse]:
        """
        List property lists owned by a principal.

        Retrieves metadata for all property lists, optionally filtered
        by principal or pagination parameters.

        Args:
            request: Request parameters with optional filtering

        Returns:
            TaskResult containing ListPropertyListsResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_property_lists",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.list_property_lists(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="list_property_lists",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, ListPropertyListsResponse)

    async def update_property_list(
        self,
        request: UpdatePropertyListRequest,
    ) -> TaskResult[UpdatePropertyListResponse]:
        """
        Update a property list.

        Modifies the filters, brand manifest, or other parameters
        of an existing property list.

        Args:
            request: Request parameters with list_id and updates

        Returns:
            TaskResult containing UpdatePropertyListResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_property_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.update_property_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="update_property_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, UpdatePropertyListResponse)

    async def delete_property_list(
        self,
        request: DeletePropertyListRequest,
    ) -> TaskResult[DeletePropertyListResponse]:
        """
        Delete a property list.

        Removes a property list. Any active subscriptions to this list
        will be terminated.

        Args:
            request: Request parameters with list_id

        Returns:
            TaskResult containing DeletePropertyListResponse
        """
        operation_id = create_operation_id()
        params = request.model_dump(exclude_none=True)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_REQUEST,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="delete_property_list",
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        raw_result = await self.adapter.delete_property_list(params)

        self._emit_activity(
            Activity(
                type=ActivityType.PROTOCOL_RESPONSE,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type="delete_property_list",
                status=raw_result.status,
                timestamp=datetime.now(timezone.utc).isoformat(),
            )
        )

        return self.adapter._parse_response(raw_result, DeletePropertyListResponse)

    async def list_tools(self) -> list[str]:
        """
        List available tools from the agent.

        Returns:
            List of tool names
        """
        return await self.adapter.list_tools()

    async def get_info(self) -> dict[str, Any]:
        """
        Get agent information including AdCP extension metadata.

        Returns agent card information including:
        - Agent name, description, version
        - Protocol type (mcp or a2a)
        - AdCP version (from extensions.adcp.adcp_version)
        - Supported protocols (from extensions.adcp.protocols_supported)
        - Available tools/skills

        Returns:
            Dictionary with agent metadata
        """
        return await self.adapter.get_agent_info()

    async def close(self) -> None:
        """Close the adapter and clean up resources."""
        if hasattr(self.adapter, "close"):
            logger.debug(f"Closing adapter for agent {self.agent_config.id}")
            await self.adapter.close()

    async def __aenter__(self) -> ADCPClient:
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        await self.close()

    def _verify_webhook_signature(
        self,
        payload: dict[str, Any],
        signature: str,
        timestamp: str,
        raw_body: bytes | str | None = None,
    ) -> bool:
        """
        Verify HMAC-SHA256 signature of webhook payload.

        The verification algorithm matches get_adcp_signed_headers_for_webhook:
        1. Constructs message as "{timestamp}.{json_payload}"
        2. Uses raw HTTP body bytes when available (preserves sender's serialization)
        3. Falls back to json.dumps() if raw_body not provided
        4. HMAC-SHA256 signs with the shared secret
        5. Compares against the provided signature (with "sha256=" prefix stripped)

        Args:
            payload: Webhook payload dict (used as fallback if raw_body not provided)
            signature: Signature to verify (with or without "sha256=" prefix)
            timestamp: Unix timestamp in seconds from X-AdCP-Timestamp header
            raw_body: Raw HTTP request body bytes. When provided, used directly
                for signature verification to avoid cross-language serialization
                mismatches. Strongly recommended for production use.

        Returns:
            True if signature is valid, False otherwise
        """
        if not self.webhook_secret:
            logger.warning(
                "Webhook signature verification skipped: no webhook_secret configured"
            )
            return True

        # Reject stale or future timestamps to prevent replay attacks
        try:
            ts = int(timestamp)
        except (ValueError, TypeError):
            return False
        now = int(time.time())
        if abs(now - ts) > self.webhook_timestamp_tolerance:
            return False

        # Strip "sha256=" prefix if present
        if signature.startswith("sha256="):
            signature = signature[7:]

        # Use raw body if available (avoids cross-language serialization mismatches),
        # otherwise fall back to json.dumps() for backward compatibility
        if raw_body is not None:
            payload_str = raw_body.decode("utf-8") if isinstance(raw_body, bytes) else raw_body
        else:
            payload_str = json.dumps(payload)

        # Construct signed message: timestamp.payload
        signed_message = f"{timestamp}.{payload_str}"

        # Generate expected signature
        expected_signature = hmac.new(
            self.webhook_secret.encode("utf-8"), signed_message.encode("utf-8"), hashlib.sha256
        ).hexdigest()

        return hmac.compare_digest(signature, expected_signature)

    def _parse_webhook_result(
        self,
        task_id: str,
        task_type: str,
        operation_id: str,
        status: GeneratedTaskStatus,
        result: Any,
        timestamp: datetime | str,
        message: str | None,
        context_id: str | None,
    ) -> TaskResult[AdcpAsyncResponseData]:
        """
        Parse webhook data into typed TaskResult based on task_type.

        Args:
            task_id: Unique identifier for this task
            task_type: Task type from application routing (e.g., "get_products")
            operation_id: Operation identifier from application routing
            status: Current task status
            result: Task-specific payload (AdCP response data)
            timestamp: ISO 8601 timestamp when webhook was generated
            message: Human-readable summary of task state
            context_id: Session/conversation identifier

        Returns:
            TaskResult with task-specific typed response data

        Note:
            This method works with both MCP and A2A protocols by accepting
            protocol-agnostic parameters rather than protocol-specific objects.
        """
        from adcp.utils.response_parser import parse_json_or_text

        # Map task types to their response types (using string literals, not enum)
        # Note: Some response types are Union types (e.g., ActivateSignalResponse = Success | Error)
        response_type_map: dict[str, type[BaseModel] | Any] = {
            # Core operations
            "get_products": GetProductsResponse,
            "list_creative_formats": ListCreativeFormatsResponse,
            "sync_creatives": SyncCreativesResponse,
            "list_creatives": ListCreativesResponse,
            "build_creative": BuildCreativeResponse,
            "preview_creative": PreviewCreativeResponse,
            "create_media_buy": CreateMediaBuyResponse,
            "update_media_buy": UpdateMediaBuyResponse,
            "get_media_buy_delivery": GetMediaBuyDeliveryResponse,
            "get_media_buys": GetMediaBuysResponse,
            "get_signals": GetSignalsResponse,
            "activate_signal": ActivateSignalResponse,
            "provide_performance_feedback": ProvidePerformanceFeedbackResponse,
            "report_usage": ReportUsageResponse,
            "get_account_financials": GetAccountFinancialsResponse,
            "list_accounts": ListAccountsResponse,
            "sync_accounts": SyncAccountsResponse,
            "log_event": LogEventResponse,
            "sync_event_sources": SyncEventSourcesResponse,
            "sync_audiences": SyncAudiencesResponse,
            "sync_catalogs": SyncCatalogsResponse,
            "get_creative_delivery": GetCreativeDeliveryResponse,
            # V3 Protocol Discovery
            "get_adcp_capabilities": GetAdcpCapabilitiesResponse,
            # V3 Content Standards
            "create_content_standards": CreateContentStandardsResponse,
            "get_content_standards": GetContentStandardsResponse,
            "list_content_standards": ListContentStandardsResponse,
            "update_content_standards": UpdateContentStandardsResponse,
            "calibrate_content": CalibrateContentResponse,
            "validate_content_delivery": ValidateContentDeliveryResponse,
            "get_media_buy_artifacts": GetMediaBuyArtifactsResponse,
            # V3 Sponsored Intelligence
            "si_get_offering": SiGetOfferingResponse,
            "si_initiate_session": SiInitiateSessionResponse,
            "si_send_message": SiSendMessageResponse,
            "si_terminate_session": SiTerminateSessionResponse,
            # V3 Governance
            "get_creative_features": GetCreativeFeaturesResponse,
            "sync_plans": SyncPlansResponse,
            "check_governance": CheckGovernanceResponse,
            "report_plan_outcome": ReportPlanOutcomeResponse,
            "get_plan_audit_logs": GetPlanAuditLogsResponse,
            "create_property_list": CreatePropertyListResponse,
            "get_property_list": GetPropertyListResponse,
            "list_property_lists": ListPropertyListsResponse,
            "update_property_list": UpdatePropertyListResponse,
            "delete_property_list": DeletePropertyListResponse,
        }

        # Handle completed tasks with result parsing
        if status == GeneratedTaskStatus.completed and result is not None:
            response_type = response_type_map.get(task_type)
            if response_type:
                try:
                    parsed_result: Any = parse_json_or_text(result, response_type)
                    return TaskResult[AdcpAsyncResponseData](
                        status=TaskStatus.COMPLETED,
                        data=parsed_result,
                        success=True,
                        metadata={
                            "task_id": task_id,
                            "operation_id": operation_id,
                            "timestamp": timestamp,
                            "message": message,
                        },
                    )
                except ValueError as e:
                    logger.warning(f"Failed to parse webhook result: {e}")
                    # Fall through to untyped result

        # Handle failed, input-required, or unparseable results
        # Convert status to core TaskStatus enum
        status_map = {
            GeneratedTaskStatus.completed: TaskStatus.COMPLETED,
            GeneratedTaskStatus.submitted: TaskStatus.SUBMITTED,
            GeneratedTaskStatus.working: TaskStatus.WORKING,
            GeneratedTaskStatus.failed: TaskStatus.FAILED,
            GeneratedTaskStatus.input_required: TaskStatus.NEEDS_INPUT,
        }
        task_status = status_map.get(status, TaskStatus.FAILED)

        # Extract error message from result.errors if present
        error_message: str | None = None
        if result is not None and hasattr(result, "errors"):
            errors = getattr(result, "errors", None)
            if errors and len(errors) > 0:
                first_error = errors[0]
                if hasattr(first_error, "message"):
                    error_message = first_error.message

        return TaskResult[AdcpAsyncResponseData](
            status=task_status,
            data=result,
            success=status == GeneratedTaskStatus.completed,
            error=error_message,
            metadata={
                "task_id": task_id,
                "operation_id": operation_id,
                "timestamp": timestamp,
                "message": message,
                "context_id": context_id,
            },
        )

    async def _handle_mcp_webhook(
        self,
        payload: dict[str, Any],
        task_type: str,
        operation_id: str,
        signature: str | None,
        timestamp: str | None = None,
        raw_body: bytes | str | None = None,
    ) -> TaskResult[AdcpAsyncResponseData]:
        """
        Handle MCP webhook delivered via HTTP POST.

        Args:
            payload: Webhook payload dict
            task_type: Task type from application routing
            operation_id: Operation identifier from application routing
            signature: Optional HMAC-SHA256 signature for verification (X-AdCP-Signature header)
            timestamp: Optional Unix timestamp for signature verification (X-AdCP-Timestamp header)
            raw_body: Optional raw HTTP request body for signature verification

        Returns:
            TaskResult with parsed task-specific response data

        Raises:
            ADCPWebhookSignatureError: If signature verification fails
            ValidationError: If payload doesn't match McpWebhookPayload schema
        """
        from adcp.types.generated_poc.core.mcp_webhook_payload import McpWebhookPayload

        # When a webhook_secret is configured, require signed webhooks
        if self.webhook_secret:
            if not signature or not timestamp:
                raise ADCPWebhookSignatureError(
                    "Webhook signature and timestamp headers are required"
                )
            if not self._verify_webhook_signature(payload, signature, timestamp, raw_body):
                logger.warning(
                    f"Webhook signature verification failed for agent {self.agent_config.id}"
                )
                raise ADCPWebhookSignatureError("Invalid webhook signature")

        # Validate and parse MCP webhook payload
        webhook = McpWebhookPayload.model_validate(payload)

        # Emit activity for monitoring
        self._emit_activity(
            Activity(
                type=ActivityType.WEBHOOK_RECEIVED,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type=task_type,
                timestamp=datetime.now(timezone.utc).isoformat(),
                metadata={"payload": payload, "protocol": "mcp"},
            )
        )

        # Extract fields and parse result
        return self._parse_webhook_result(
            task_id=webhook.task_id,
            task_type=task_type,
            operation_id=operation_id,
            status=webhook.status,
            result=webhook.result,
            timestamp=webhook.timestamp,
            message=webhook.message,
            context_id=webhook.context_id,
        )

    async def _handle_a2a_webhook(
        self, payload: Task | TaskStatusUpdateEvent, task_type: str, operation_id: str
    ) -> TaskResult[AdcpAsyncResponseData]:
        """
        Handle A2A webhook delivered through Task or TaskStatusUpdateEvent.

        Per A2A specification:
        - Terminated statuses (completed, failed): Payload is Task with artifacts[].parts[]
        - Intermediate statuses (working, input-required, submitted):
        Payload is TaskStatusUpdateEvent with status.message.parts[]

        Args:
            payload: A2A Task or TaskStatusUpdateEvent object
            task_type: Task type from application routing
            operation_id: Operation identifier from application routing

        Returns:
            TaskResult with parsed task-specific response data

        Note:
            Signature verification is NOT applicable for A2A webhooks
            as they arrive through authenticated A2A connections, not HTTP.
        """
        from a2a.types import DataPart, TextPart

        adcp_data: Any = None
        text_message: str | None = None
        task_id: str
        context_id: str | None
        status_state: str
        timestamp: datetime | str

        # Type detection and extraction based on payload type
        if isinstance(payload, TaskStatusUpdateEvent):
            # Intermediate status: Extract from status.message.parts[]
            task_id = payload.task_id
            context_id = payload.context_id
            status_state = payload.status.state if payload.status else "failed"
            timestamp = (
                payload.status.timestamp
                if payload.status and payload.status.timestamp
                else datetime.now(timezone.utc)
            )

            # Extract from status.message.parts[]
            if payload.status and payload.status.message and payload.status.message.parts:
                # Extract DataPart for structured AdCP payload
                data_parts = [
                    p.root for p in payload.status.message.parts if isinstance(p.root, DataPart)
                ]
                if data_parts:
                    # Use last DataPart as authoritative
                    last_data_part = data_parts[-1]
                    adcp_data = last_data_part.data

                    # Unwrap {"response": {...}} wrapper if present (ADK pattern)
                    if isinstance(adcp_data, dict) and "response" in adcp_data:
                        adcp_data = adcp_data["response"]

                # Extract TextPart for human-readable message
                for part in payload.status.message.parts:
                    if isinstance(part.root, TextPart):
                        text_message = part.root.text
                        break

        else:
            # Terminated status (Task): Extract from artifacts[].parts[]
            task_id = payload.id
            context_id = payload.context_id
            status_state = payload.status.state if payload.status else "failed"
            timestamp = (
                payload.status.timestamp
                if payload.status and payload.status.timestamp
                else datetime.now(timezone.utc)
            )

            # Extract from task.artifacts[].parts[]
            # Following A2A spec: use last artifact, last DataPart is authoritative
            if payload.artifacts:
                # Use last artifact (most recent in streaming scenarios)
                target_artifact = payload.artifacts[-1]

                if target_artifact.parts:
                    # Extract DataPart for structured AdCP payload
                    data_parts = [
                        p.root for p in target_artifact.parts if isinstance(p.root, DataPart)
                    ]
                    if data_parts:
                        # Use last DataPart as authoritative
                        last_data_part = data_parts[-1]
                        adcp_data = last_data_part.data

                        # Unwrap {"response": {...}} wrapper if present (ADK pattern)
                        if isinstance(adcp_data, dict) and "response" in adcp_data:
                            adcp_data = adcp_data["response"]

                    # Extract TextPart for human-readable message
                    for part in target_artifact.parts:
                        if isinstance(part.root, TextPart):
                            text_message = part.root.text
                            break

        # Map A2A status.state to GeneratedTaskStatus enum
        status_map = {
            "completed": GeneratedTaskStatus.completed,
            "submitted": GeneratedTaskStatus.submitted,
            "working": GeneratedTaskStatus.working,
            "failed": GeneratedTaskStatus.failed,
            "input-required": GeneratedTaskStatus.input_required,
            "input_required": GeneratedTaskStatus.input_required,  # Handle both formats
        }
        mapped_status = status_map.get(status_state, GeneratedTaskStatus.failed)

        # Emit activity for monitoring
        self._emit_activity(
            Activity(
                type=ActivityType.WEBHOOK_RECEIVED,
                operation_id=operation_id,
                agent_id=self.agent_config.id,
                task_type=task_type,
                timestamp=datetime.now(timezone.utc).isoformat(),
                metadata={
                    "task_id": task_id,
                    "protocol": "a2a",
                    "payload_type": (
                        "TaskStatusUpdateEvent"
                        if isinstance(payload, TaskStatusUpdateEvent)
                        else "Task"
                    ),
                },
            )
        )

        # Parse and return typed result by passing extracted fields directly
        return self._parse_webhook_result(
            task_id=task_id,
            task_type=task_type,
            operation_id=operation_id,
            status=mapped_status,
            result=adcp_data,
            timestamp=timestamp,
            message=text_message,
            context_id=context_id,
        )

    async def handle_webhook(
        self,
        payload: dict[str, Any] | Task | TaskStatusUpdateEvent,
        task_type: str,
        operation_id: str,
        signature: str | None = None,
        timestamp: str | None = None,
        raw_body: bytes | str | None = None,
    ) -> TaskResult[AdcpAsyncResponseData]:
        """
        Handle incoming webhook and return typed result.

        This method provides a unified interface for handling webhooks from both
        MCP and A2A protocols:

        - MCP Webhooks: HTTP POST with dict payload, optional HMAC signature
        - A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status

        The method automatically detects the protocol type and routes to the
        appropriate handler. Both protocols return a consistent TaskResult
        structure with typed AdCP response data.

        Args:
            payload: Webhook payload - one of:
                - dict[str, Any]: MCP webhook payload from HTTP POST
                - Task: A2A webhook for terminated statuses (completed, failed)
                - TaskStatusUpdateEvent: A2A webhook for intermediate statuses
                  (working, input-required, submitted)
            task_type: Task type from application routing (e.g., "get_products").
                Applications should extract this from URL routing pattern:
                /webhook/{task_type}/{agent_id}/{operation_id}
            operation_id: Operation identifier from application routing.
                Used to correlate webhook notifications with original task submission.
            signature: Optional HMAC-SHA256 signature for MCP webhook verification
                (X-AdCP-Signature header). Ignored for A2A webhooks.
            timestamp: Optional Unix timestamp (seconds) for MCP webhook signature
                verification (X-AdCP-Timestamp header). Required when signature is provided.
            raw_body: Optional raw HTTP request body bytes for signature verification.
                When provided, used directly instead of re-serializing the payload,
                avoiding cross-language JSON serialization mismatches. Strongly
                recommended for production use.

        Returns:
            TaskResult with parsed task-specific response data. The structure
            is identical regardless of protocol.

        Raises:
            ADCPWebhookSignatureError: If MCP signature verification fails
            ValidationError: If MCP payload doesn't match WebhookPayload schema

        Note:
            task_type and operation_id were deprecated from the webhook payload
            per AdCP specification. Applications must extract these from URL
            routing and pass them explicitly.

        Examples:
            MCP webhook (HTTP endpoint):
            >>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}")
            >>> async def webhook_handler(task_type: str, operation_id: str, request: Request):
            >>>     raw_body = await request.body()
            >>>     payload = json.loads(raw_body)
            >>>     signature = request.headers.get("X-AdCP-Signature")
            >>>     timestamp = request.headers.get("X-AdCP-Timestamp")
            >>>     result = await client.handle_webhook(
            >>>         payload, task_type, operation_id, signature, timestamp,
            >>>         raw_body=raw_body,
            >>>     )
            >>>     if result.success:
            >>>         print(f"Task completed: {result.data}")

            A2A webhook with Task (terminated status):
            >>> async def on_task_completed(task: Task):
            >>>     # Extract task_type and operation_id from your app's task tracking
            >>>     task_type = your_task_registry.get_type(task.id)
            >>>     operation_id = your_task_registry.get_operation_id(task.id)
            >>>     result = await client.handle_webhook(
            >>>         task, task_type, operation_id
            >>>     )
            >>>     if result.success:
            >>>         print(f"Task completed: {result.data}")

            A2A webhook with TaskStatusUpdateEvent (intermediate status):
            >>> async def on_task_update(event: TaskStatusUpdateEvent):
            >>>     # Extract task_type and operation_id from your app's task tracking
            >>>     task_type = your_task_registry.get_type(event.task_id)
            >>>     operation_id = your_task_registry.get_operation_id(event.task_id)
            >>>     result = await client.handle_webhook(
            >>>         event, task_type, operation_id
            >>>     )
            >>>     if result.status == GeneratedTaskStatus.working:
            >>>         print(f"Task still working: {result.metadata.get('message')}")
        """
        # Detect protocol type and route to appropriate handler
        if isinstance(payload, (Task, TaskStatusUpdateEvent)):
            # A2A webhook (Task or TaskStatusUpdateEvent)
            return await self._handle_a2a_webhook(payload, task_type, operation_id)
        else:
            # MCP webhook (dict payload)
            return await self._handle_mcp_webhook(
                payload, task_type, operation_id, signature, timestamp, raw_body
            )

Client for interacting with a single AdCP agent.

Initialize ADCP client for a single agent.

Args

agent_config
Agent configuration
webhook_url_template
Template for webhook URLs with {agent_id}, {task_type}, {operation_id}
webhook_secret
Secret for webhook signature verification
on_activity
Callback for activity events
webhook_timestamp_tolerance
Maximum age (in seconds) for webhook timestamps. Webhooks with timestamps older than this or more than this far in the future are rejected. Defaults to 300 (5 minutes).
capabilities_ttl
Time-to-live in seconds for cached capabilities (default: 1 hour)
validate_features
When True, automatically check that the seller supports required features before making task calls (e.g., sync_audiences requires audience_targeting). Requires capabilities to have been fetched first.

Instance variables

prop capabilities : GetAdcpCapabilitiesResponse | None
Expand source code
@property
def capabilities(self) -> GetAdcpCapabilitiesResponse | None:
    """Return cached capabilities, or None if not yet fetched."""
    return self._capabilities

Return cached capabilities, or None if not yet fetched.

prop feature_resolver : FeatureResolver | None
Expand source code
@property
def feature_resolver(self) -> FeatureResolver | None:
    """Return the FeatureResolver for cached capabilities, or None."""
    return self._feature_resolver

Return the FeatureResolver for cached capabilities, or None.

Methods

async def activate_signal(self, request: ActivateSignalRequest) ‑> TaskResult[Union[ActivateSignalResponse1, ActivateSignalResponse2]]
Expand source code
async def activate_signal(
    self,
    request: ActivateSignalRequest,
) -> TaskResult[ActivateSignalResponse]:
    """
    Activate Signal.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ActivateSignalResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="activate_signal",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.activate_signal(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="activate_signal",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ActivateSignalResponse)

Activate Signal.

Args

request
Request parameters

Returns

TaskResult containing ActivateSignalResponse

async def build_creative(self, request: BuildCreativeRequest) ‑> TaskResult[Union[BuildCreativeResponse1, BuildCreativeResponse2, BuildCreativeResponse3]]
Expand source code
async def build_creative(
    self,
    request: BuildCreativeRequest,
) -> TaskResult[BuildCreativeResponse]:
    """
    Generate production-ready creative assets.

    Requests the creative agent to build final deliverable assets in the target
    format (e.g., VAST, DAAST, HTML5). This is typically called after previewing
    and approving a creative manifest.

    Args:
        request: Creative build parameters including:
            - manifest: Creative manifest with brand info and content
            - target_format_id: Desired output format identifier
            - inputs: Optional user-provided inputs for template variables
            - deployment: Platform or agent deployment configuration

    Returns:
        TaskResult containing BuildCreativeResponse with:
            - assets: Production-ready creative files (URLs or inline content)
            - format_id: The generated format identifier
            - manifest: The creative manifest used for generation
            - metadata: Additional platform-specific details

    Example:
        >>> from adcp import ADCPClient, BuildCreativeRequest
        >>> client = ADCPClient(agent_config)
        >>> request = BuildCreativeRequest(
        ...     manifest=creative_manifest,
        ...     target_format_id="vast_2.0",
        ...     inputs={"duration": 30}
        ... )
        >>> result = await client.build_creative(request)
        >>> if result.success:
        ...     vast_url = result.data.assets[0].url
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="build_creative",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.build_creative(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="build_creative",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, BuildCreativeResponse)

Generate production-ready creative assets.

Requests the creative agent to build final deliverable assets in the target format (e.g., VAST, DAAST, HTML5). This is typically called after previewing and approving a creative manifest.

Args

request
Creative build parameters including: - manifest: Creative manifest with brand info and content - target_format_id: Desired output format identifier - inputs: Optional user-provided inputs for template variables - deployment: Platform or agent deployment configuration

Returns

TaskResult containing BuildCreativeResponse with: - assets: Production-ready creative files (URLs or inline content) - format_id: The generated format identifier - manifest: The creative manifest used for generation - metadata: Additional platform-specific details

Example

>>> from adcp import ADCPClient, BuildCreativeRequest
>>> client = ADCPClient(agent_config)
>>> request = BuildCreativeRequest(
...     manifest=creative_manifest,
...     target_format_id="vast_2.0",
...     inputs={"duration": 30}
... )
>>> result = await client.build_creative(request)
>>> if result.success:
...     vast_url = result.data.assets[0].url
async def calibrate_content(self, request: CalibrateContentRequest) ‑> TaskResult[Union[CalibrateContentResponse1, CalibrateContentResponse2]]
Expand source code
async def calibrate_content(
    self,
    request: CalibrateContentRequest,
) -> TaskResult[CalibrateContentResponse]:
    """
    Calibrate content against standards.

    Evaluates content (artifact or URL) against configured standards to
    determine suitability for ad placement.

    Args:
        request: Request parameters including content to evaluate

    Returns:
        TaskResult containing CalibrateContentResponse with verdict
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="calibrate_content",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.calibrate_content(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="calibrate_content",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CalibrateContentResponse)

Calibrate content against standards.

Evaluates content (artifact or URL) against configured standards to determine suitability for ad placement.

Args

request
Request parameters including content to evaluate

Returns

TaskResult containing CalibrateContentResponse with verdict

async def check_governance(self, request: CheckGovernanceRequest) ‑> TaskResult[CheckGovernanceResponse]
Expand source code
async def check_governance(
    self,
    request: CheckGovernanceRequest,
) -> TaskResult[CheckGovernanceResponse]:
    """Check a proposed or committed action against campaign governance."""
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="check_governance",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.check_governance(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="check_governance",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CheckGovernanceResponse)

Check a proposed or committed action against campaign governance.

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close the adapter and clean up resources."""
    if hasattr(self.adapter, "close"):
        logger.debug(f"Closing adapter for agent {self.agent_config.id}")
        await self.adapter.close()

Close the adapter and clean up resources.

async def create_content_standards(self, request: CreateContentStandardsRequest) ‑> TaskResult[Union[CreateContentStandardsResponse1, CreateContentStandardsResponse2]]
Expand source code
async def create_content_standards(
    self,
    request: CreateContentStandardsRequest,
) -> TaskResult[CreateContentStandardsResponse]:
    """
    Create a new content standards configuration.

    Defines acceptable content contexts for ad placement using natural
    language policy and optional calibration exemplars.

    Args:
        request: Request parameters including policy and scope

    Returns:
        TaskResult containing CreateContentStandardsResponse with standards_id
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_content_standards",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.create_content_standards(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_content_standards",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CreateContentStandardsResponse)

Create a new content standards configuration.

Defines acceptable content contexts for ad placement using natural language policy and optional calibration exemplars.

Args

request
Request parameters including policy and scope

Returns

TaskResult containing CreateContentStandardsResponse with standards_id

async def create_media_buy(self, request: CreateMediaBuyRequest) ‑> TaskResult[Union[CreateMediaBuyResponse1, CreateMediaBuyResponse2]]
Expand source code
async def create_media_buy(
    self,
    request: CreateMediaBuyRequest,
) -> TaskResult[CreateMediaBuyResponse]:
    """
    Create a new media buy reservation.

    Requests the agent to reserve inventory for a campaign. The agent returns a
    media_buy_id that tracks this reservation and can be used for updates.

    Args:
        request: Media buy creation parameters including:
            - brand_manifest: Advertiser brand information and creative assets
            - packages: List of package requests specifying desired inventory
            - publisher_properties: Target properties for ad placement
            - budget: Optional budget constraints
            - start_date/end_date: Campaign flight dates

    Returns:
        TaskResult containing CreateMediaBuyResponse with:
            - media_buy_id: Unique identifier for this reservation
            - status: Current state of the media buy
            - packages: Confirmed package details
            - Additional platform-specific metadata

    Example:
        >>> from adcp import ADCPClient, CreateMediaBuyRequest
        >>> client = ADCPClient(agent_config)
        >>> request = CreateMediaBuyRequest(
        ...     brand_manifest=brand,
        ...     packages=[package_request],
        ...     publisher_properties=properties
        ... )
        >>> result = await client.create_media_buy(request)
        >>> if result.success:
        ...     media_buy_id = result.data.media_buy_id
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_media_buy",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.create_media_buy(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_media_buy",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CreateMediaBuyResponse)

Create a new media buy reservation.

Requests the agent to reserve inventory for a campaign. The agent returns a media_buy_id that tracks this reservation and can be used for updates.

Args

request
Media buy creation parameters including: - brand_manifest: Advertiser brand information and creative assets - packages: List of package requests specifying desired inventory - publisher_properties: Target properties for ad placement - budget: Optional budget constraints - start_date/end_date: Campaign flight dates

Returns

TaskResult containing CreateMediaBuyResponse with: - media_buy_id: Unique identifier for this reservation - status: Current state of the media buy - packages: Confirmed package details - Additional platform-specific metadata

Example

>>> from adcp import ADCPClient, CreateMediaBuyRequest
>>> client = ADCPClient(agent_config)
>>> request = CreateMediaBuyRequest(
...     brand_manifest=brand,
...     packages=[package_request],
...     publisher_properties=properties
... )
>>> result = await client.create_media_buy(request)
>>> if result.success:
...     media_buy_id = result.data.media_buy_id
async def create_property_list(self, request: CreatePropertyListRequest) ‑> TaskResult[CreatePropertyListResponse]
Expand source code
async def create_property_list(
    self,
    request: CreatePropertyListRequest,
) -> TaskResult[CreatePropertyListResponse]:
    """
    Create a property list for governance filtering.

    Property lists define dynamic sets of properties based on filters,
    brand manifests, and feature requirements.

    Args:
        request: Request parameters for creating the property list

    Returns:
        TaskResult containing CreatePropertyListResponse with list_id
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_property_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.create_property_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="create_property_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, CreatePropertyListResponse)

Create a property list for governance filtering.

Property lists define dynamic sets of properties based on filters, brand manifests, and feature requirements.

Args

request
Request parameters for creating the property list

Returns

TaskResult containing CreatePropertyListResponse with list_id

async def delete_property_list(self, request: DeletePropertyListRequest) ‑> TaskResult[DeletePropertyListResponse]
Expand source code
async def delete_property_list(
    self,
    request: DeletePropertyListRequest,
) -> TaskResult[DeletePropertyListResponse]:
    """
    Delete a property list.

    Removes a property list. Any active subscriptions to this list
    will be terminated.

    Args:
        request: Request parameters with list_id

    Returns:
        TaskResult containing DeletePropertyListResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="delete_property_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.delete_property_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="delete_property_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, DeletePropertyListResponse)

Delete a property list.

Removes a property list. Any active subscriptions to this list will be terminated.

Args

request
Request parameters with list_id

Returns

TaskResult containing DeletePropertyListResponse

async def fetch_capabilities(self) ‑> GetAdcpCapabilitiesResponse
Expand source code
async def fetch_capabilities(self) -> GetAdcpCapabilitiesResponse:
    """Fetch capabilities, using cache if still valid.

    Returns:
        The seller's capabilities response.
    """
    if self._capabilities is not None and self._capabilities_fetched_at is not None:
        elapsed = time.monotonic() - self._capabilities_fetched_at
        if elapsed < self.capabilities_ttl:
            return self._capabilities

    return await self.refresh_capabilities()

Fetch capabilities, using cache if still valid.

Returns

The seller's capabilities response.

async def get_account_financials(self, request: GetAccountFinancialsRequest) ‑> TaskResult[Union[GetAccountFinancialsResponse1, GetAccountFinancialsResponse2]]
Expand source code
async def get_account_financials(
    self,
    request: GetAccountFinancialsRequest,
) -> TaskResult[GetAccountFinancialsResponse]:
    """
    Get Account Financials.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetAccountFinancialsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_account_financials",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_account_financials(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_account_financials",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetAccountFinancialsResponse)

Get Account Financials.

Args

request
Request parameters

Returns

TaskResult containing GetAccountFinancialsResponse

async def get_adcp_capabilities(self, request: GetAdcpCapabilitiesRequest) ‑> TaskResult[GetAdcpCapabilitiesResponse]
Expand source code
async def get_adcp_capabilities(
    self,
    request: GetAdcpCapabilitiesRequest,
) -> TaskResult[GetAdcpCapabilitiesResponse]:
    """
    Get AdCP capabilities from the agent.

    Queries the agent's supported AdCP features, protocol versions, and
    domain-specific capabilities (media_buy, signals, sponsored_intelligence).

    Args:
        request: Request parameters including optional protocol filters

    Returns:
        TaskResult containing GetAdcpCapabilitiesResponse with:
            - adcp: Core protocol version information
            - supported_protocols: List of supported domain protocols
            - media_buy: Media buy capabilities (if supported)
            - sponsored_intelligence: SI capabilities (if supported)
            - signals: Signals capabilities (if supported)
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_adcp_capabilities",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_adcp_capabilities(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_adcp_capabilities",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetAdcpCapabilitiesResponse)

Get AdCP capabilities from the agent.

Queries the agent's supported AdCP features, protocol versions, and domain-specific capabilities (media_buy, signals, sponsored_intelligence).

Args

request
Request parameters including optional protocol filters

Returns

TaskResult containing GetAdcpCapabilitiesResponse with: - adcp: Core protocol version information - supported_protocols: List of supported domain protocols - media_buy: Media buy capabilities (if supported) - sponsored_intelligence: SI capabilities (if supported) - signals: Signals capabilities (if supported)

async def get_content_standards(self, request: GetContentStandardsRequest) ‑> TaskResult[Union[GetContentStandardsResponse1, GetContentStandardsResponse2]]
Expand source code
async def get_content_standards(
    self,
    request: GetContentStandardsRequest,
) -> TaskResult[GetContentStandardsResponse]:
    """
    Get a content standards configuration by ID.

    Args:
        request: Request parameters including standards_id

    Returns:
        TaskResult containing GetContentStandardsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_content_standards",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_content_standards(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_content_standards",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetContentStandardsResponse)

Get a content standards configuration by ID.

Args

request
Request parameters including standards_id

Returns

TaskResult containing GetContentStandardsResponse

async def get_creative_delivery(self, request: GetCreativeDeliveryRequest) ‑> TaskResult[GetCreativeDeliveryResponse]
Expand source code
async def get_creative_delivery(
    self,
    request: GetCreativeDeliveryRequest,
) -> TaskResult[GetCreativeDeliveryResponse]:
    """
    Get Creative Delivery.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetCreativeDeliveryResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_creative_delivery",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_creative_delivery(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_creative_delivery",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetCreativeDeliveryResponse)

Get Creative Delivery.

Args

request
Request parameters

Returns

TaskResult containing GetCreativeDeliveryResponse

async def get_creative_features(self, request: GetCreativeFeaturesRequest) ‑> TaskResult[Union[GetCreativeFeaturesResponse1, GetCreativeFeaturesResponse2]]
Expand source code
async def get_creative_features(
    self,
    request: GetCreativeFeaturesRequest,
) -> TaskResult[GetCreativeFeaturesResponse]:
    """Evaluate governance features for a creative manifest."""
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_creative_features",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_creative_features(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_creative_features",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetCreativeFeaturesResponse)

Evaluate governance features for a creative manifest.

async def get_info(self) ‑> dict[str, typing.Any]
Expand source code
async def get_info(self) -> dict[str, Any]:
    """
    Get agent information including AdCP extension metadata.

    Returns agent card information including:
    - Agent name, description, version
    - Protocol type (mcp or a2a)
    - AdCP version (from extensions.adcp.adcp_version)
    - Supported protocols (from extensions.adcp.protocols_supported)
    - Available tools/skills

    Returns:
        Dictionary with agent metadata
    """
    return await self.adapter.get_agent_info()

Get agent information including AdCP extension metadata.

Returns agent card information including: - Agent name, description, version - Protocol type (mcp or a2a) - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills

Returns

Dictionary with agent metadata

async def get_media_buy_artifacts(self, request: GetMediaBuyArtifactsRequest) ‑> TaskResult[Union[GetMediaBuyArtifactsResponse1, GetMediaBuyArtifactsResponse2]]
Expand source code
async def get_media_buy_artifacts(
    self,
    request: GetMediaBuyArtifactsRequest,
) -> TaskResult[GetMediaBuyArtifactsResponse]:
    """
    Get artifacts associated with a media buy.

    Retrieves content artifacts where ads were delivered for a media buy.

    Args:
        request: Request parameters including media_buy_id

    Returns:
        TaskResult containing GetMediaBuyArtifactsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buy_artifacts",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_media_buy_artifacts(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buy_artifacts",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetMediaBuyArtifactsResponse)

Get artifacts associated with a media buy.

Retrieves content artifacts where ads were delivered for a media buy.

Args

request
Request parameters including media_buy_id

Returns

TaskResult containing GetMediaBuyArtifactsResponse

async def get_media_buy_delivery(self, request: GetMediaBuyDeliveryRequest) ‑> TaskResult[GetMediaBuyDeliveryResponse]
Expand source code
async def get_media_buy_delivery(
    self,
    request: GetMediaBuyDeliveryRequest,
) -> TaskResult[GetMediaBuyDeliveryResponse]:
    """
    Get Media Buy Delivery.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetMediaBuyDeliveryResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buy_delivery",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_media_buy_delivery(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buy_delivery",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse)

Get Media Buy Delivery.

Args

request
Request parameters

Returns

TaskResult containing GetMediaBuyDeliveryResponse

async def get_media_buys(self, request: GetMediaBuysRequest) ‑> TaskResult[GetMediaBuysResponse]
Expand source code
async def get_media_buys(
    self,
    request: GetMediaBuysRequest,
) -> TaskResult[GetMediaBuysResponse]:
    """
    Get Media Buys.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetMediaBuysResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buys",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_media_buys(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_media_buys",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetMediaBuysResponse)

Get Media Buys.

Args

request
Request parameters

Returns

TaskResult containing GetMediaBuysResponse

async def get_plan_audit_logs(self, request: GetPlanAuditLogsRequest) ‑> TaskResult[GetPlanAuditLogsResponse]
Expand source code
async def get_plan_audit_logs(
    self,
    request: GetPlanAuditLogsRequest,
) -> TaskResult[GetPlanAuditLogsResponse]:
    """Retrieve governance state and audit logs for one or more plans."""
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_plan_audit_logs",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_plan_audit_logs(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_plan_audit_logs",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetPlanAuditLogsResponse)

Retrieve governance state and audit logs for one or more plans.

async def get_products(self,
request: GetProductsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url',
creative_agent_client: ADCPClient | None = None) ‑> TaskResult[GetProductsResponse]
Expand source code
async def get_products(
    self,
    request: GetProductsRequest,
    fetch_previews: bool = False,
    preview_output_format: str = "url",
    creative_agent_client: ADCPClient | None = None,
) -> TaskResult[GetProductsResponse]:
    """
    Get advertising products.

    Args:
        request: Request parameters
        fetch_previews: If True, generate preview URLs for each product's formats
            (uses batch API for 5-10x performance improvement)
        preview_output_format: "url" for iframe URLs (default), "html" for direct
            embedding (2-3x faster, no iframe overhead)
        creative_agent_client: Client for creative agent (required if
            fetch_previews=True)

    Returns:
        TaskResult containing GetProductsResponse with optional preview URLs in metadata

    Raises:
        ValueError: If fetch_previews=True but creative_agent_client is not provided
    """
    if fetch_previews and not creative_agent_client:
        raise ValueError("creative_agent_client is required when fetch_previews=True")

    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_products",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_products(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_products",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    result: TaskResult[GetProductsResponse] = self.adapter._parse_response(
        raw_result, GetProductsResponse
    )

    if fetch_previews and result.success and result.data and creative_agent_client:
        from adcp.utils.preview_cache import add_preview_urls_to_products

        products_with_previews = await add_preview_urls_to_products(
            result.data.products,
            creative_agent_client,
            use_batch=True,
            output_format=preview_output_format,
        )
        result.metadata = result.metadata or {}
        result.metadata["products_with_previews"] = products_with_previews

    return result

Get advertising products.

Args

request
Request parameters
fetch_previews
If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement)
preview_output_format
"url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)
creative_agent_client
Client for creative agent (required if fetch_previews=True)

Returns

TaskResult containing GetProductsResponse with optional preview URLs in metadata

Raises

ValueError
If fetch_previews=True but creative_agent_client is not provided
async def get_property_list(self, request: GetPropertyListRequest) ‑> TaskResult[GetPropertyListResponse]
Expand source code
async def get_property_list(
    self,
    request: GetPropertyListRequest,
) -> TaskResult[GetPropertyListResponse]:
    """
    Get a property list with optional resolution.

    When resolve=true, returns the list of resolved property identifiers.
    Use this to get the actual properties that match the list's filters.

    Args:
        request: Request parameters including list_id and resolve flag

    Returns:
        TaskResult containing GetPropertyListResponse with identifiers
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_property_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_property_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_property_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetPropertyListResponse)

Get a property list with optional resolution.

When resolve=true, returns the list of resolved property identifiers. Use this to get the actual properties that match the list's filters.

Args

request
Request parameters including list_id and resolve flag

Returns

TaskResult containing GetPropertyListResponse with identifiers

async def get_signals(self, request: GetSignalsRequest) ‑> TaskResult[GetSignalsResponse]
Expand source code
async def get_signals(
    self,
    request: GetSignalsRequest,
) -> TaskResult[GetSignalsResponse]:
    """
    Get Signals.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing GetSignalsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_signals",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.get_signals(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="get_signals",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, GetSignalsResponse)

Get Signals.

Args

request
Request parameters

Returns

TaskResult containing GetSignalsResponse

def get_webhook_url(self, task_type: str, operation_id: str) ‑> str
Expand source code
def get_webhook_url(self, task_type: str, operation_id: str) -> str:
    """Generate webhook URL for a task."""
    if not self.webhook_url_template:
        raise ValueError("webhook_url_template not configured")

    return self.webhook_url_template.format(
        agent_id=self.agent_config.id,
        task_type=task_type,
        operation_id=operation_id,
    )

Generate webhook URL for a task.

async def handle_webhook(self,
payload: dict[str, Any] | Task | TaskStatusUpdateEvent,
task_type: str,
operation_id: str,
signature: str | None = None,
timestamp: str | None = None,
raw_body: bytes | str | None = None) ‑> TaskResult[AdcpAsyncResponseData]
Expand source code
async def handle_webhook(
    self,
    payload: dict[str, Any] | Task | TaskStatusUpdateEvent,
    task_type: str,
    operation_id: str,
    signature: str | None = None,
    timestamp: str | None = None,
    raw_body: bytes | str | None = None,
) -> TaskResult[AdcpAsyncResponseData]:
    """
    Handle incoming webhook and return typed result.

    This method provides a unified interface for handling webhooks from both
    MCP and A2A protocols:

    - MCP Webhooks: HTTP POST with dict payload, optional HMAC signature
    - A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status

    The method automatically detects the protocol type and routes to the
    appropriate handler. Both protocols return a consistent TaskResult
    structure with typed AdCP response data.

    Args:
        payload: Webhook payload - one of:
            - dict[str, Any]: MCP webhook payload from HTTP POST
            - Task: A2A webhook for terminated statuses (completed, failed)
            - TaskStatusUpdateEvent: A2A webhook for intermediate statuses
              (working, input-required, submitted)
        task_type: Task type from application routing (e.g., "get_products").
            Applications should extract this from URL routing pattern:
            /webhook/{task_type}/{agent_id}/{operation_id}
        operation_id: Operation identifier from application routing.
            Used to correlate webhook notifications with original task submission.
        signature: Optional HMAC-SHA256 signature for MCP webhook verification
            (X-AdCP-Signature header). Ignored for A2A webhooks.
        timestamp: Optional Unix timestamp (seconds) for MCP webhook signature
            verification (X-AdCP-Timestamp header). Required when signature is provided.
        raw_body: Optional raw HTTP request body bytes for signature verification.
            When provided, used directly instead of re-serializing the payload,
            avoiding cross-language JSON serialization mismatches. Strongly
            recommended for production use.

    Returns:
        TaskResult with parsed task-specific response data. The structure
        is identical regardless of protocol.

    Raises:
        ADCPWebhookSignatureError: If MCP signature verification fails
        ValidationError: If MCP payload doesn't match WebhookPayload schema

    Note:
        task_type and operation_id were deprecated from the webhook payload
        per AdCP specification. Applications must extract these from URL
        routing and pass them explicitly.

    Examples:
        MCP webhook (HTTP endpoint):
        >>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}")
        >>> async def webhook_handler(task_type: str, operation_id: str, request: Request):
        >>>     raw_body = await request.body()
        >>>     payload = json.loads(raw_body)
        >>>     signature = request.headers.get("X-AdCP-Signature")
        >>>     timestamp = request.headers.get("X-AdCP-Timestamp")
        >>>     result = await client.handle_webhook(
        >>>         payload, task_type, operation_id, signature, timestamp,
        >>>         raw_body=raw_body,
        >>>     )
        >>>     if result.success:
        >>>         print(f"Task completed: {result.data}")

        A2A webhook with Task (terminated status):
        >>> async def on_task_completed(task: Task):
        >>>     # Extract task_type and operation_id from your app's task tracking
        >>>     task_type = your_task_registry.get_type(task.id)
        >>>     operation_id = your_task_registry.get_operation_id(task.id)
        >>>     result = await client.handle_webhook(
        >>>         task, task_type, operation_id
        >>>     )
        >>>     if result.success:
        >>>         print(f"Task completed: {result.data}")

        A2A webhook with TaskStatusUpdateEvent (intermediate status):
        >>> async def on_task_update(event: TaskStatusUpdateEvent):
        >>>     # Extract task_type and operation_id from your app's task tracking
        >>>     task_type = your_task_registry.get_type(event.task_id)
        >>>     operation_id = your_task_registry.get_operation_id(event.task_id)
        >>>     result = await client.handle_webhook(
        >>>         event, task_type, operation_id
        >>>     )
        >>>     if result.status == GeneratedTaskStatus.working:
        >>>         print(f"Task still working: {result.metadata.get('message')}")
    """
    # Detect protocol type and route to appropriate handler
    if isinstance(payload, (Task, TaskStatusUpdateEvent)):
        # A2A webhook (Task or TaskStatusUpdateEvent)
        return await self._handle_a2a_webhook(payload, task_type, operation_id)
    else:
        # MCP webhook (dict payload)
        return await self._handle_mcp_webhook(
            payload, task_type, operation_id, signature, timestamp, raw_body
        )

Handle incoming webhook and return typed result.

This method provides a unified interface for handling webhooks from both MCP and A2A protocols:

  • MCP Webhooks: HTTP POST with dict payload, optional HMAC signature
  • A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status

The method automatically detects the protocol type and routes to the appropriate handler. Both protocols return a consistent TaskResult structure with typed AdCP response data.

Args

payload
Webhook payload - one of: - dict[str, Any]: MCP webhook payload from HTTP POST - Task: A2A webhook for terminated statuses (completed, failed) - TaskStatusUpdateEvent: A2A webhook for intermediate statuses (working, input-required, submitted)
task_type
Task type from application routing (e.g., "get_products"). Applications should extract this from URL routing pattern: /webhook/{task_type}/{agent_id}/{operation_id}
operation_id
Operation identifier from application routing. Used to correlate webhook notifications with original task submission.
signature
Optional HMAC-SHA256 signature for MCP webhook verification (X-AdCP-Signature header). Ignored for A2A webhooks.
timestamp
Optional Unix timestamp (seconds) for MCP webhook signature verification (X-AdCP-Timestamp header). Required when signature is provided.
raw_body
Optional raw HTTP request body bytes for signature verification. When provided, used directly instead of re-serializing the payload, avoiding cross-language JSON serialization mismatches. Strongly recommended for production use.

Returns

TaskResult with parsed task-specific response data. The structure is identical regardless of protocol.

Raises

ADCPWebhookSignatureError
If MCP signature verification fails
ValidationError
If MCP payload doesn't match WebhookPayload schema

Note

task_type and operation_id were deprecated from the webhook payload per AdCP specification. Applications must extract these from URL routing and pass them explicitly.

Examples

MCP webhook (HTTP endpoint):

>>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}")
>>> async def webhook_handler(task_type: str, operation_id: str, request: Request):
>>>     raw_body = await request.body()
>>>     payload = json.loads(raw_body)
>>>     signature = request.headers.get("X-AdCP-Signature")
>>>     timestamp = request.headers.get("X-AdCP-Timestamp")
>>>     result = await client.handle_webhook(
>>>         payload, task_type, operation_id, signature, timestamp,
>>>         raw_body=raw_body,
>>>     )
>>>     if result.success:
>>>         print(f"Task completed: {result.data}")

A2A webhook with Task (terminated status):

>>> async def on_task_completed(task: Task):
>>>     # Extract task_type and operation_id from your app's task tracking
>>>     task_type = your_task_registry.get_type(task.id)
>>>     operation_id = your_task_registry.get_operation_id(task.id)
>>>     result = await client.handle_webhook(
>>>         task, task_type, operation_id
>>>     )
>>>     if result.success:
>>>         print(f"Task completed: {result.data}")

A2A webhook with TaskStatusUpdateEvent (intermediate status):

>>> async def on_task_update(event: TaskStatusUpdateEvent):
>>>     # Extract task_type and operation_id from your app's task tracking
>>>     task_type = your_task_registry.get_type(event.task_id)
>>>     operation_id = your_task_registry.get_operation_id(event.task_id)
>>>     result = await client.handle_webhook(
>>>         event, task_type, operation_id
>>>     )
>>>     if result.status == GeneratedTaskStatus.working:
>>>         print(f"Task still working: {result.metadata.get('message')}")
async def list_accounts(self, request: ListAccountsRequest) ‑> TaskResult[ListAccountsResponse]
Expand source code
async def list_accounts(
    self,
    request: ListAccountsRequest,
) -> TaskResult[ListAccountsResponse]:
    """
    List Accounts.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ListAccountsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_accounts",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_accounts(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_accounts",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ListAccountsResponse)

List Accounts.

Args

request
Request parameters

Returns

TaskResult containing ListAccountsResponse

async def list_content_standards(self, request: ListContentStandardsRequest) ‑> TaskResult[Union[ListContentStandardsResponse1, ListContentStandardsResponse2]]
Expand source code
async def list_content_standards(
    self,
    request: ListContentStandardsRequest,
) -> TaskResult[ListContentStandardsResponse]:
    """
    List content standards configurations.

    Args:
        request: Request parameters including optional filters

    Returns:
        TaskResult containing ListContentStandardsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_content_standards",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_content_standards(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_content_standards",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ListContentStandardsResponse)

List content standards configurations.

Args

request
Request parameters including optional filters

Returns

TaskResult containing ListContentStandardsResponse

async def list_creative_formats(self,
request: ListCreativeFormatsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url') ‑> TaskResult[ListCreativeFormatsResponse]
Expand source code
async def list_creative_formats(
    self,
    request: ListCreativeFormatsRequest,
    fetch_previews: bool = False,
    preview_output_format: str = "url",
) -> TaskResult[ListCreativeFormatsResponse]:
    """
    List supported creative formats.

    Args:
        request: Request parameters
        fetch_previews: If True, generate preview URLs for each format using
            sample manifests (uses batch API for 5-10x performance improvement)
        preview_output_format: "url" for iframe URLs (default), "html" for direct
            embedding (2-3x faster, no iframe overhead)

    Returns:
        TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_creative_formats",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_creative_formats(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_creative_formats",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response(
        raw_result, ListCreativeFormatsResponse
    )

    if fetch_previews and result.success and result.data:
        from adcp.utils.preview_cache import add_preview_urls_to_formats

        formats_with_previews = await add_preview_urls_to_formats(
            result.data.formats,
            self,
            use_batch=True,
            output_format=preview_output_format,
        )
        result.metadata = result.metadata or {}
        result.metadata["formats_with_previews"] = formats_with_previews

    return result

List supported creative formats.

Args

request
Request parameters
fetch_previews
If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement)
preview_output_format
"url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)

Returns

TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata

async def list_creatives(self, request: ListCreativesRequest) ‑> TaskResult[ListCreativesResponse]
Expand source code
async def list_creatives(
    self,
    request: ListCreativesRequest,
) -> TaskResult[ListCreativesResponse]:
    """
    List Creatives.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ListCreativesResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_creatives",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_creatives(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_creatives",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ListCreativesResponse)

List Creatives.

Args

request
Request parameters

Returns

TaskResult containing ListCreativesResponse

async def list_property_lists(self, request: ListPropertyListsRequest) ‑> TaskResult[ListPropertyListsResponse]
Expand source code
async def list_property_lists(
    self,
    request: ListPropertyListsRequest,
) -> TaskResult[ListPropertyListsResponse]:
    """
    List property lists owned by a principal.

    Retrieves metadata for all property lists, optionally filtered
    by principal or pagination parameters.

    Args:
        request: Request parameters with optional filtering

    Returns:
        TaskResult containing ListPropertyListsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_property_lists",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.list_property_lists(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="list_property_lists",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ListPropertyListsResponse)

List property lists owned by a principal.

Retrieves metadata for all property lists, optionally filtered by principal or pagination parameters.

Args

request
Request parameters with optional filtering

Returns

TaskResult containing ListPropertyListsResponse

async def list_tools(self) ‑> list[str]
Expand source code
async def list_tools(self) -> list[str]:
    """
    List available tools from the agent.

    Returns:
        List of tool names
    """
    return await self.adapter.list_tools()

List available tools from the agent.

Returns

List of tool names

async def log_event(self, request: LogEventRequest) ‑> TaskResult[Union[LogEventResponse1, LogEventResponse2]]
Expand source code
async def log_event(
    self,
    request: LogEventRequest,
) -> TaskResult[LogEventResponse]:
    """
    Log Event.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing LogEventResponse
    """
    self._validate_task_features("log_event")
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="log_event",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.log_event(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="log_event",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, LogEventResponse)

Log Event.

Args

request
Request parameters

Returns

TaskResult containing LogEventResponse

async def preview_creative(self, request: PreviewCreativeRequest) ‑> TaskResult[Union[PreviewCreativeResponse1, PreviewCreativeResponse2, PreviewCreativeResponse3]]
Expand source code
async def preview_creative(
    self,
    request: PreviewCreativeRequest,
) -> TaskResult[PreviewCreativeResponse]:
    """
    Generate preview of a creative manifest.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing PreviewCreativeResponse with preview URLs
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="preview_creative",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.preview_creative(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="preview_creative",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, PreviewCreativeResponse)

Generate preview of a creative manifest.

Args

request
Request parameters

Returns

TaskResult containing PreviewCreativeResponse with preview URLs

async def provide_performance_feedback(self, request: ProvidePerformanceFeedbackRequest) ‑> TaskResult[Union[ProvidePerformanceFeedbackResponse1, ProvidePerformanceFeedbackResponse2]]
Expand source code
async def provide_performance_feedback(
    self,
    request: ProvidePerformanceFeedbackRequest,
) -> TaskResult[ProvidePerformanceFeedbackResponse]:
    """
    Provide Performance Feedback.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ProvidePerformanceFeedbackResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="provide_performance_feedback",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.provide_performance_feedback(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="provide_performance_feedback",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse)

Provide Performance Feedback.

Args

request
Request parameters

Returns

TaskResult containing ProvidePerformanceFeedbackResponse

async def refresh_capabilities(self) ‑> GetAdcpCapabilitiesResponse
Expand source code
async def refresh_capabilities(self) -> GetAdcpCapabilitiesResponse:
    """Fetch capabilities from the seller, bypassing cache.

    Returns:
        The seller's capabilities response.
    """
    result = await self.get_adcp_capabilities(GetAdcpCapabilitiesRequest())
    if result.success and result.data is not None:
        self._capabilities = result.data
        self._feature_resolver = FeatureResolver(result.data)
        self._capabilities_fetched_at = time.monotonic()
        return self._capabilities
    raise ADCPError(
        f"Failed to fetch capabilities: {result.error or result.message}",
        agent_id=self.agent_config.id,
        agent_uri=self.agent_config.agent_uri,
    )

Fetch capabilities from the seller, bypassing cache.

Returns

The seller's capabilities response.

async def report_plan_outcome(self, request: ReportPlanOutcomeRequest) ‑> TaskResult[ReportPlanOutcomeResponse]
Expand source code
async def report_plan_outcome(
    self,
    request: ReportPlanOutcomeRequest,
) -> TaskResult[ReportPlanOutcomeResponse]:
    """Report the outcome of a governed action to the governance agent."""
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="report_plan_outcome",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.report_plan_outcome(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="report_plan_outcome",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ReportPlanOutcomeResponse)

Report the outcome of a governed action to the governance agent.

async def report_usage(self, request: ReportUsageRequest) ‑> TaskResult[ReportUsageResponse]
Expand source code
async def report_usage(
    self,
    request: ReportUsageRequest,
) -> TaskResult[ReportUsageResponse]:
    """
    Report Usage.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing ReportUsageResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="report_usage",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.report_usage(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="report_usage",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ReportUsageResponse)

Report Usage.

Args

request
Request parameters

Returns

TaskResult containing ReportUsageResponse

def require(self, *features: str) ‑> None
Expand source code
def require(self, *features: str) -> None:
    """Assert that the seller supports all listed features.

    Args:
        *features: Feature identifiers to require.

    Raises:
        ADCPFeatureUnsupportedError: If any features are not supported.
        ADCPError: If capabilities have not been fetched yet.
    """
    self._ensure_resolver().require(
        *features,
        agent_id=self.agent_config.id,
        agent_uri=self.agent_config.agent_uri,
    )

Assert that the seller supports all listed features.

Args

*features
Feature identifiers to require.

Raises

ADCPFeatureUnsupportedError
If any features are not supported.
ADCPError
If capabilities have not been fetched yet.
async def si_get_offering(self, request: SiGetOfferingRequest) ‑> TaskResult[SiGetOfferingResponse]
Expand source code
async def si_get_offering(
    self,
    request: SiGetOfferingRequest,
) -> TaskResult[SiGetOfferingResponse]:
    """
    Get sponsored intelligence offering.

    Retrieves product/service offerings that can be presented in a
    sponsored intelligence session.

    Args:
        request: Request parameters including brand context

    Returns:
        TaskResult containing SiGetOfferingResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_get_offering",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.si_get_offering(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_get_offering",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SiGetOfferingResponse)

Get sponsored intelligence offering.

Retrieves product/service offerings that can be presented in a sponsored intelligence session.

Args

request
Request parameters including brand context

Returns

TaskResult containing SiGetOfferingResponse

async def si_initiate_session(self, request: SiInitiateSessionRequest) ‑> TaskResult[SiInitiateSessionResponse]
Expand source code
async def si_initiate_session(
    self,
    request: SiInitiateSessionRequest,
) -> TaskResult[SiInitiateSessionResponse]:
    """
    Initiate a sponsored intelligence session.

    Starts a conversational brand experience session with a user.

    Args:
        request: Request parameters including identity and context

    Returns:
        TaskResult containing SiInitiateSessionResponse with session_id
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_initiate_session",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.si_initiate_session(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_initiate_session",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SiInitiateSessionResponse)

Initiate a sponsored intelligence session.

Starts a conversational brand experience session with a user.

Args

request
Request parameters including identity and context

Returns

TaskResult containing SiInitiateSessionResponse with session_id

async def si_send_message(self, request: SiSendMessageRequest) ‑> TaskResult[SiSendMessageResponse]
Expand source code
async def si_send_message(
    self,
    request: SiSendMessageRequest,
) -> TaskResult[SiSendMessageResponse]:
    """
    Send a message in a sponsored intelligence session.

    Continues the conversation in an active SI session.

    Args:
        request: Request parameters including session_id and message

    Returns:
        TaskResult containing SiSendMessageResponse with brand response
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_send_message",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.si_send_message(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_send_message",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SiSendMessageResponse)

Send a message in a sponsored intelligence session.

Continues the conversation in an active SI session.

Args

request
Request parameters including session_id and message

Returns

TaskResult containing SiSendMessageResponse with brand response

async def si_terminate_session(self, request: SiTerminateSessionRequest) ‑> TaskResult[SiTerminateSessionResponse]
Expand source code
async def si_terminate_session(
    self,
    request: SiTerminateSessionRequest,
) -> TaskResult[SiTerminateSessionResponse]:
    """
    Terminate a sponsored intelligence session.

    Ends an active SI session, optionally with follow-up actions.

    Args:
        request: Request parameters including session_id and termination context

    Returns:
        TaskResult containing SiTerminateSessionResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_terminate_session",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.si_terminate_session(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="si_terminate_session",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SiTerminateSessionResponse)

Terminate a sponsored intelligence session.

Ends an active SI session, optionally with follow-up actions.

Args

request
Request parameters including session_id and termination context

Returns

TaskResult containing SiTerminateSessionResponse

def supports(self, feature: str) ‑> bool
Expand source code
def supports(self, feature: str) -> bool:
    """Check if the seller supports a feature.

    Supports multiple feature namespaces:
    - Protocol support: ``supports("media_buy")`` checks ``supported_protocols``
    - Extension support: ``supports("ext:scope3")`` checks ``extensions_supported``
    - Targeting: ``supports("targeting.geo_countries")`` checks
      ``media_buy.execution.targeting``
    - Media buy features: ``supports("audience_targeting")`` checks
      ``media_buy.features``
    - Signals features: ``supports("catalog_signals")`` checks
      ``signals.features``

    Args:
        feature: Feature identifier to check.

    Returns:
        True if the seller declares the feature as supported.

    Raises:
        ADCPError: If capabilities have not been fetched yet.
    """
    return self._ensure_resolver().supports(feature)

Check if the seller supports a feature.

Supports multiple feature namespaces: - Protocol support: supports("media_buy") checks supported_protocols - Extension support: supports("ext:scope3") checks extensions_supported - Targeting: supports("targeting.geo_countries") checks media_buy.execution.targeting - Media buy features: supports("audience_targeting") checks media_buy.features - Signals features: supports("catalog_signals") checks signals.features

Args

feature
Feature identifier to check.

Returns

True if the seller declares the feature as supported.

Raises

ADCPError
If capabilities have not been fetched yet.
async def sync_accounts(self, request: SyncAccountsRequest) ‑> TaskResult[Union[SyncAccountsResponse1, SyncAccountsResponse2]]
Expand source code
async def sync_accounts(
    self,
    request: SyncAccountsRequest,
) -> TaskResult[SyncAccountsResponse]:
    """
    Sync Accounts.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncAccountsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_accounts",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_accounts(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_accounts",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncAccountsResponse)

Sync Accounts.

Args

request
Request parameters

Returns

TaskResult containing SyncAccountsResponse

async def sync_audiences(self, request: SyncAudiencesRequest) ‑> TaskResult[Union[SyncAudiencesResponse1, SyncAudiencesResponse2]]
Expand source code
async def sync_audiences(
    self,
    request: SyncAudiencesRequest,
) -> TaskResult[SyncAudiencesResponse]:
    """
    Sync Audiences.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncAudiencesResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_audiences",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_audiences(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_audiences",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncAudiencesResponse)

Sync Audiences.

Args

request
Request parameters

Returns

TaskResult containing SyncAudiencesResponse

async def sync_catalogs(self, request: SyncCatalogsRequest) ‑> TaskResult[Union[SyncCatalogsResponse1, SyncCatalogsResponse2]]
Expand source code
async def sync_catalogs(
    self,
    request: SyncCatalogsRequest,
) -> TaskResult[SyncCatalogsResponse]:
    """
    Sync Catalogs.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncCatalogsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_catalogs",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_catalogs(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_catalogs",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncCatalogsResponse)

Sync Catalogs.

Args

request
Request parameters

Returns

TaskResult containing SyncCatalogsResponse

async def sync_creatives(self, request: SyncCreativesRequest) ‑> TaskResult[Union[SyncCreativesResponse1, SyncCreativesResponse2]]
Expand source code
async def sync_creatives(
    self,
    request: SyncCreativesRequest,
) -> TaskResult[SyncCreativesResponse]:
    """
    Sync Creatives.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncCreativesResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_creatives",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_creatives(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_creatives",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncCreativesResponse)

Sync Creatives.

Args

request
Request parameters

Returns

TaskResult containing SyncCreativesResponse

async def sync_event_sources(self, request: SyncEventSourcesRequest) ‑> TaskResult[Union[SyncEventSourcesResponse1, SyncEventSourcesResponse2]]
Expand source code
async def sync_event_sources(
    self,
    request: SyncEventSourcesRequest,
) -> TaskResult[SyncEventSourcesResponse]:
    """
    Sync Event Sources.

    Args:
        request: Request parameters

    Returns:
        TaskResult containing SyncEventSourcesResponse
    """
    self._validate_task_features("sync_event_sources")
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_event_sources",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_event_sources(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_event_sources",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncEventSourcesResponse)

Sync Event Sources.

Args

request
Request parameters

Returns

TaskResult containing SyncEventSourcesResponse

async def sync_plans(self, request: SyncPlansRequest) ‑> TaskResult[SyncPlansResponse]
Expand source code
async def sync_plans(
    self,
    request: SyncPlansRequest,
) -> TaskResult[SyncPlansResponse]:
    """Sync campaign governance plans to the governance agent."""
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_plans",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.sync_plans(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="sync_plans",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, SyncPlansResponse)

Sync campaign governance plans to the governance agent.

async def update_content_standards(self, request: UpdateContentStandardsRequest) ‑> TaskResult[Union[UpdateContentStandardsResponse1, UpdateContentStandardsResponse2]]
Expand source code
async def update_content_standards(
    self,
    request: UpdateContentStandardsRequest,
) -> TaskResult[UpdateContentStandardsResponse]:
    """
    Update a content standards configuration.

    Args:
        request: Request parameters including standards_id and updates

    Returns:
        TaskResult containing UpdateContentStandardsResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_content_standards",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.update_content_standards(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_content_standards",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, UpdateContentStandardsResponse)

Update a content standards configuration.

Args

request
Request parameters including standards_id and updates

Returns

TaskResult containing UpdateContentStandardsResponse

async def update_media_buy(self, request: UpdateMediaBuyRequest) ‑> TaskResult[Union[UpdateMediaBuyResponse1, UpdateMediaBuyResponse2]]
Expand source code
async def update_media_buy(
    self,
    request: UpdateMediaBuyRequest,
) -> TaskResult[UpdateMediaBuyResponse]:
    """
    Update an existing media buy reservation.

    Modifies a previously created media buy by updating packages or publisher
    properties. The update operation uses discriminated unions to specify what
    to change - either package details or targeting properties.

    Args:
        request: Media buy update parameters including:
            - media_buy_id: Identifier from create_media_buy response
            - updates: Discriminated union specifying update type:
                * UpdateMediaBuyPackagesRequest: Modify package selections
                * UpdateMediaBuyPropertiesRequest: Change targeting properties

    Returns:
        TaskResult containing UpdateMediaBuyResponse with:
            - media_buy_id: The updated media buy identifier
            - status: Updated state of the media buy
            - packages: Updated package configurations
            - Additional platform-specific metadata

    Example:
        >>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest
        >>> client = ADCPClient(agent_config)
        >>> request = UpdateMediaBuyPackagesRequest(
        ...     media_buy_id="mb_123",
        ...     packages=[updated_package]
        ... )
        >>> result = await client.update_media_buy(request)
        >>> if result.success:
        ...     updated_packages = result.data.packages
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_media_buy",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.update_media_buy(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_media_buy",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, UpdateMediaBuyResponse)

Update an existing media buy reservation.

Modifies a previously created media buy by updating packages or publisher properties. The update operation uses discriminated unions to specify what to change - either package details or targeting properties.

Args

request
Media buy update parameters including: - media_buy_id: Identifier from create_media_buy response - updates: Discriminated union specifying update type: * UpdateMediaBuyPackagesRequest: Modify package selections * UpdateMediaBuyPropertiesRequest: Change targeting properties

Returns

TaskResult containing UpdateMediaBuyResponse with: - media_buy_id: The updated media buy identifier - status: Updated state of the media buy - packages: Updated package configurations - Additional platform-specific metadata

Example

>>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest
>>> client = ADCPClient(agent_config)
>>> request = UpdateMediaBuyPackagesRequest(
...     media_buy_id="mb_123",
...     packages=[updated_package]
... )
>>> result = await client.update_media_buy(request)
>>> if result.success:
...     updated_packages = result.data.packages
async def update_property_list(self, request: UpdatePropertyListRequest) ‑> TaskResult[UpdatePropertyListResponse]
Expand source code
async def update_property_list(
    self,
    request: UpdatePropertyListRequest,
) -> TaskResult[UpdatePropertyListResponse]:
    """
    Update a property list.

    Modifies the filters, brand manifest, or other parameters
    of an existing property list.

    Args:
        request: Request parameters with list_id and updates

    Returns:
        TaskResult containing UpdatePropertyListResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_property_list",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.update_property_list(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="update_property_list",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, UpdatePropertyListResponse)

Update a property list.

Modifies the filters, brand manifest, or other parameters of an existing property list.

Args

request
Request parameters with list_id and updates

Returns

TaskResult containing UpdatePropertyListResponse

async def validate_content_delivery(self, request: ValidateContentDeliveryRequest) ‑> TaskResult[Union[ValidateContentDeliveryResponse1, ValidateContentDeliveryResponse2]]
Expand source code
async def validate_content_delivery(
    self,
    request: ValidateContentDeliveryRequest,
) -> TaskResult[ValidateContentDeliveryResponse]:
    """
    Validate content delivery against standards.

    Validates that ad delivery records comply with content standards.

    Args:
        request: Request parameters including delivery records

    Returns:
        TaskResult containing ValidateContentDeliveryResponse
    """
    operation_id = create_operation_id()
    params = request.model_dump(exclude_none=True)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_REQUEST,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="validate_content_delivery",
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    raw_result = await self.adapter.validate_content_delivery(params)

    self._emit_activity(
        Activity(
            type=ActivityType.PROTOCOL_RESPONSE,
            operation_id=operation_id,
            agent_id=self.agent_config.id,
            task_type="validate_content_delivery",
            status=raw_result.status,
            timestamp=datetime.now(timezone.utc).isoformat(),
        )
    )

    return self.adapter._parse_response(raw_result, ValidateContentDeliveryResponse)

Validate content delivery against standards.

Validates that ad delivery records comply with content standards.

Args

request
Request parameters including delivery records

Returns

TaskResult containing ValidateContentDeliveryResponse

class ADCPMultiAgentClient (agents: list[AgentConfig],
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
handlers: dict[str, Callable[..., Any]] | None = None)
Expand source code
class ADCPMultiAgentClient:
    """Client for managing multiple AdCP agents."""

    def __init__(
        self,
        agents: list[AgentConfig],
        webhook_url_template: str | None = None,
        webhook_secret: str | None = None,
        on_activity: Callable[[Activity], None] | None = None,
        handlers: dict[str, Callable[..., Any]] | None = None,
    ):
        """
        Initialize multi-agent client.

        Args:
            agents: List of agent configurations
            webhook_url_template: Template for webhook URLs
            webhook_secret: Secret for webhook verification
            on_activity: Callback for activity events
            handlers: Task completion handlers
        """
        self.agents = {
            agent.id: ADCPClient(
                agent,
                webhook_url_template=webhook_url_template,
                webhook_secret=webhook_secret,
                on_activity=on_activity,
            )
            for agent in agents
        }
        self.handlers = handlers or {}

    def agent(self, agent_id: str) -> ADCPClient:
        """Get client for specific agent."""
        if agent_id not in self.agents:
            raise ValueError(f"Agent not found: {agent_id}")
        return self.agents[agent_id]

    @property
    def agent_ids(self) -> list[str]:
        """Get list of agent IDs."""
        return list(self.agents.keys())

    async def close(self) -> None:
        """Close all agent clients and clean up resources."""
        import asyncio

        logger.debug("Closing all agent clients in multi-agent client")
        close_tasks = [client.close() for client in self.agents.values()]
        await asyncio.gather(*close_tasks, return_exceptions=True)

    async def __aenter__(self) -> ADCPMultiAgentClient:
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        await self.close()

    async def get_products(
        self,
        request: GetProductsRequest,
    ) -> list[TaskResult[GetProductsResponse]]:
        """
        Execute get_products across all agents in parallel.

        Args:
            request: Request parameters

        Returns:
            List of TaskResults containing GetProductsResponse for each agent
        """
        import asyncio

        tasks = [agent.get_products(request) for agent in self.agents.values()]
        return await asyncio.gather(*tasks)

    @classmethod
    def from_env(cls) -> ADCPMultiAgentClient:
        """Create client from environment variables."""
        agents_json = os.getenv("ADCP_AGENTS")
        if not agents_json:
            raise ValueError("ADCP_AGENTS environment variable not set")

        agents_data = json.loads(agents_json)
        agents = [AgentConfig(**agent) for agent in agents_data]

        return cls(
            agents=agents,
            webhook_url_template=os.getenv("WEBHOOK_URL_TEMPLATE"),
            webhook_secret=os.getenv("WEBHOOK_SECRET"),
        )

Client for managing multiple AdCP agents.

Initialize multi-agent client.

Args

agents
List of agent configurations
webhook_url_template
Template for webhook URLs
webhook_secret
Secret for webhook verification
on_activity
Callback for activity events
handlers
Task completion handlers

Static methods

def from_env() ‑> ADCPMultiAgentClient

Create client from environment variables.

Instance variables

prop agent_ids : list[str]
Expand source code
@property
def agent_ids(self) -> list[str]:
    """Get list of agent IDs."""
    return list(self.agents.keys())

Get list of agent IDs.

Methods

def agent(self, agent_id: str) ‑> ADCPClient
Expand source code
def agent(self, agent_id: str) -> ADCPClient:
    """Get client for specific agent."""
    if agent_id not in self.agents:
        raise ValueError(f"Agent not found: {agent_id}")
    return self.agents[agent_id]

Get client for specific agent.

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close all agent clients and clean up resources."""
    import asyncio

    logger.debug("Closing all agent clients in multi-agent client")
    close_tasks = [client.close() for client in self.agents.values()]
    await asyncio.gather(*close_tasks, return_exceptions=True)

Close all agent clients and clean up resources.

async def get_products(self, request: GetProductsRequest) ‑> list[TaskResult[GetProductsResponse]]
Expand source code
async def get_products(
    self,
    request: GetProductsRequest,
) -> list[TaskResult[GetProductsResponse]]:
    """
    Execute get_products across all agents in parallel.

    Args:
        request: Request parameters

    Returns:
        List of TaskResults containing GetProductsResponse for each agent
    """
    import asyncio

    tasks = [agent.get_products(request) for agent in self.agents.values()]
    return await asyncio.gather(*tasks)

Execute get_products across all agents in parallel.

Args

request
Request parameters

Returns

List of TaskResults containing GetProductsResponse for each agent