Module adcp.client
Classes
class ADCPClient (agent_config: AgentConfig,
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
webhook_timestamp_tolerance: int = 300,
capabilities_ttl: float = 3600.0,
validate_features: bool = False)-
Expand source code
class ADCPClient: """Client for interacting with a single AdCP agent.""" def __init__( self, agent_config: AgentConfig, webhook_url_template: str | None = None, webhook_secret: str | None = None, on_activity: Callable[[Activity], None] | None = None, webhook_timestamp_tolerance: int = 300, capabilities_ttl: float = 3600.0, validate_features: bool = False, ): """ Initialize ADCP client for a single agent. Args: agent_config: Agent configuration webhook_url_template: Template for webhook URLs with {agent_id}, {task_type}, {operation_id} webhook_secret: Secret for webhook signature verification on_activity: Callback for activity events webhook_timestamp_tolerance: Maximum age (in seconds) for webhook timestamps. Webhooks with timestamps older than this or more than this far in the future are rejected. Defaults to 300 (5 minutes). capabilities_ttl: Time-to-live in seconds for cached capabilities (default: 1 hour) validate_features: When True, automatically check that the seller supports required features before making task calls (e.g., sync_audiences requires audience_targeting). Requires capabilities to have been fetched first. """ self.agent_config = agent_config self.webhook_url_template = webhook_url_template self.webhook_secret = webhook_secret self.on_activity = on_activity self.webhook_timestamp_tolerance = webhook_timestamp_tolerance self.capabilities_ttl = capabilities_ttl self.validate_features = validate_features # Capabilities cache self._capabilities: GetAdcpCapabilitiesResponse | None = None self._feature_resolver: FeatureResolver | None = None self._capabilities_fetched_at: float | None = None # Initialize protocol adapter self.adapter: ProtocolAdapter if agent_config.protocol == Protocol.A2A: self.adapter = A2AAdapter(agent_config) elif agent_config.protocol == Protocol.MCP: self.adapter = MCPAdapter(agent_config) else: raise ValueError(f"Unsupported protocol: {agent_config.protocol}") # Initialize simple API accessor (lazy import to avoid circular dependency) from adcp.simple import SimpleAPI self.simple = SimpleAPI(self) def get_webhook_url(self, task_type: str, operation_id: str) -> str: """Generate webhook URL for a task.""" if not self.webhook_url_template: raise ValueError("webhook_url_template not configured") return self.webhook_url_template.format( agent_id=self.agent_config.id, task_type=task_type, operation_id=operation_id, ) def _emit_activity(self, activity: Activity) -> None: """Emit activity event.""" if self.on_activity: self.on_activity(activity) # ======================================================================== # Capability Validation # ======================================================================== @property def capabilities(self) -> GetAdcpCapabilitiesResponse | None: """Return cached capabilities, or None if not yet fetched.""" return self._capabilities @property def feature_resolver(self) -> FeatureResolver | None: """Return the FeatureResolver for cached capabilities, or None.""" return self._feature_resolver async def fetch_capabilities(self) -> GetAdcpCapabilitiesResponse: """Fetch capabilities, using cache if still valid. Returns: The seller's capabilities response. """ if self._capabilities is not None and self._capabilities_fetched_at is not None: elapsed = time.monotonic() - self._capabilities_fetched_at if elapsed < self.capabilities_ttl: return self._capabilities return await self.refresh_capabilities() async def refresh_capabilities(self) -> GetAdcpCapabilitiesResponse: """Fetch capabilities from the seller, bypassing cache. Returns: The seller's capabilities response. """ result = await self.get_adcp_capabilities(GetAdcpCapabilitiesRequest()) if result.success and result.data is not None: self._capabilities = result.data self._feature_resolver = FeatureResolver(result.data) self._capabilities_fetched_at = time.monotonic() return self._capabilities raise ADCPError( f"Failed to fetch capabilities: {result.error or result.message}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) def _ensure_resolver(self) -> FeatureResolver: """Return the FeatureResolver, raising if capabilities haven't been fetched.""" if self._feature_resolver is None: raise ADCPError( "Cannot check feature support: capabilities have not been fetched. " "Call fetch_capabilities() first.", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) return self._feature_resolver def supports(self, feature: str) -> bool: """Check if the seller supports a feature. Supports multiple feature namespaces: - Protocol support: ``supports("media_buy")`` checks ``supported_protocols`` - Extension support: ``supports("ext:scope3")`` checks ``extensions_supported`` - Targeting: ``supports("targeting.geo_countries")`` checks ``media_buy.execution.targeting`` - Media buy features: ``supports("audience_targeting")`` checks ``media_buy.features`` - Signals features: ``supports("catalog_signals")`` checks ``signals.features`` Args: feature: Feature identifier to check. Returns: True if the seller declares the feature as supported. Raises: ADCPError: If capabilities have not been fetched yet. """ return self._ensure_resolver().supports(feature) def require(self, *features: str) -> None: """Assert that the seller supports all listed features. Args: *features: Feature identifiers to require. Raises: ADCPFeatureUnsupportedError: If any features are not supported. ADCPError: If capabilities have not been fetched yet. """ self._ensure_resolver().require( *features, agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) def _validate_task_features(self, task_name: str) -> None: """Check feature requirements for a task if validate_features is enabled. Returns without checking if validate_features is False or capabilities haven't been fetched yet (logs a warning in the latter case). """ if not self.validate_features: return if self._feature_resolver is None: logger.warning( "validate_features is enabled but capabilities have not been fetched. " "Call fetch_capabilities() to enable feature validation." ) return required_feature = TASK_FEATURE_MAP.get(task_name) if required_feature is None: return self.require(required_feature) async def get_products( self, request: GetProductsRequest, fetch_previews: bool = False, preview_output_format: str = "url", creative_agent_client: ADCPClient | None = None, ) -> TaskResult[GetProductsResponse]: """ Get advertising products. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) creative_agent_client: Client for creative agent (required if fetch_previews=True) Returns: TaskResult containing GetProductsResponse with optional preview URLs in metadata Raises: ValueError: If fetch_previews=True but creative_agent_client is not provided """ if fetch_previews and not creative_agent_client: raise ValueError("creative_agent_client is required when fetch_previews=True") operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_products(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[GetProductsResponse] = self.adapter._parse_response( raw_result, GetProductsResponse ) if fetch_previews and result.success and result.data and creative_agent_client: from adcp.utils.preview_cache import add_preview_urls_to_products products_with_previews = await add_preview_urls_to_products( result.data.products, creative_agent_client, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["products_with_previews"] = products_with_previews return result async def list_creative_formats( self, request: ListCreativeFormatsRequest, fetch_previews: bool = False, preview_output_format: str = "url", ) -> TaskResult[ListCreativeFormatsResponse]: """ List supported creative formats. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) Returns: TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creative_formats(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response( raw_result, ListCreativeFormatsResponse ) if fetch_previews and result.success and result.data: from adcp.utils.preview_cache import add_preview_urls_to_formats formats_with_previews = await add_preview_urls_to_formats( result.data.formats, self, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["formats_with_previews"] = formats_with_previews return result async def preview_creative( self, request: PreviewCreativeRequest, ) -> TaskResult[PreviewCreativeResponse]: """ Generate preview of a creative manifest. Args: request: Request parameters Returns: TaskResult containing PreviewCreativeResponse with preview URLs """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.preview_creative(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, PreviewCreativeResponse) async def sync_creatives( self, request: SyncCreativesRequest, ) -> TaskResult[SyncCreativesResponse]: """ Sync Creatives. Args: request: Request parameters Returns: TaskResult containing SyncCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCreativesResponse) async def list_creatives( self, request: ListCreativesRequest, ) -> TaskResult[ListCreativesResponse]: """ List Creatives. Args: request: Request parameters Returns: TaskResult containing ListCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListCreativesResponse) async def get_media_buy_delivery( self, request: GetMediaBuyDeliveryRequest, ) -> TaskResult[GetMediaBuyDeliveryResponse]: """ Get Media Buy Delivery. Args: request: Request parameters Returns: TaskResult containing GetMediaBuyDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse) async def get_media_buys( self, request: GetMediaBuysRequest, ) -> TaskResult[GetMediaBuysResponse]: """ Get Media Buys. Args: request: Request parameters Returns: TaskResult containing GetMediaBuysResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buys", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buys(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buys", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuysResponse) async def get_signals( self, request: GetSignalsRequest, ) -> TaskResult[GetSignalsResponse]: """ Get Signals. Args: request: Request parameters Returns: TaskResult containing GetSignalsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_signals(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetSignalsResponse) async def activate_signal( self, request: ActivateSignalRequest, ) -> TaskResult[ActivateSignalResponse]: """ Activate Signal. Args: request: Request parameters Returns: TaskResult containing ActivateSignalResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.activate_signal(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ActivateSignalResponse) async def provide_performance_feedback( self, request: ProvidePerformanceFeedbackRequest, ) -> TaskResult[ProvidePerformanceFeedbackResponse]: """ Provide Performance Feedback. Args: request: Request parameters Returns: TaskResult containing ProvidePerformanceFeedbackResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.provide_performance_feedback(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse) async def create_media_buy( self, request: CreateMediaBuyRequest, ) -> TaskResult[CreateMediaBuyResponse]: """ Create a new media buy reservation. Requests the agent to reserve inventory for a campaign. The agent returns a media_buy_id that tracks this reservation and can be used for updates. Args: request: Media buy creation parameters including: - brand_manifest: Advertiser brand information and creative assets - packages: List of package requests specifying desired inventory - publisher_properties: Target properties for ad placement - budget: Optional budget constraints - start_date/end_date: Campaign flight dates Returns: TaskResult containing CreateMediaBuyResponse with: - media_buy_id: Unique identifier for this reservation - status: Current state of the media buy - packages: Confirmed package details - Additional platform-specific metadata Example: >>> from adcp import ADCPClient, CreateMediaBuyRequest >>> client = ADCPClient(agent_config) >>> request = CreateMediaBuyRequest( ... brand_manifest=brand, ... packages=[package_request], ... publisher_properties=properties ... ) >>> result = await client.create_media_buy(request) >>> if result.success: ... media_buy_id = result.data.media_buy_id """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_media_buy", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_media_buy(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_media_buy", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateMediaBuyResponse) async def update_media_buy( self, request: UpdateMediaBuyRequest, ) -> TaskResult[UpdateMediaBuyResponse]: """ Update an existing media buy reservation. Modifies a previously created media buy by updating packages or publisher properties. The update operation uses discriminated unions to specify what to change - either package details or targeting properties. Args: request: Media buy update parameters including: - media_buy_id: Identifier from create_media_buy response - updates: Discriminated union specifying update type: * UpdateMediaBuyPackagesRequest: Modify package selections * UpdateMediaBuyPropertiesRequest: Change targeting properties Returns: TaskResult containing UpdateMediaBuyResponse with: - media_buy_id: The updated media buy identifier - status: Updated state of the media buy - packages: Updated package configurations - Additional platform-specific metadata Example: >>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest >>> client = ADCPClient(agent_config) >>> request = UpdateMediaBuyPackagesRequest( ... media_buy_id="mb_123", ... packages=[updated_package] ... ) >>> result = await client.update_media_buy(request) >>> if result.success: ... updated_packages = result.data.packages """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_media_buy", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_media_buy(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_media_buy", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateMediaBuyResponse) async def build_creative( self, request: BuildCreativeRequest, ) -> TaskResult[BuildCreativeResponse]: """ Generate production-ready creative assets. Requests the creative agent to build final deliverable assets in the target format (e.g., VAST, DAAST, HTML5). This is typically called after previewing and approving a creative manifest. Args: request: Creative build parameters including: - manifest: Creative manifest with brand info and content - target_format_id: Desired output format identifier - inputs: Optional user-provided inputs for template variables - deployment: Platform or agent deployment configuration Returns: TaskResult containing BuildCreativeResponse with: - assets: Production-ready creative files (URLs or inline content) - format_id: The generated format identifier - manifest: The creative manifest used for generation - metadata: Additional platform-specific details Example: >>> from adcp import ADCPClient, BuildCreativeRequest >>> client = ADCPClient(agent_config) >>> request = BuildCreativeRequest( ... manifest=creative_manifest, ... target_format_id="vast_2.0", ... inputs={"duration": 30} ... ) >>> result = await client.build_creative(request) >>> if result.success: ... vast_url = result.data.assets[0].url """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="build_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.build_creative(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="build_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, BuildCreativeResponse) async def list_accounts( self, request: ListAccountsRequest, ) -> TaskResult[ListAccountsResponse]: """ List Accounts. Args: request: Request parameters Returns: TaskResult containing ListAccountsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_accounts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_accounts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_accounts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListAccountsResponse) async def sync_accounts( self, request: SyncAccountsRequest, ) -> TaskResult[SyncAccountsResponse]: """ Sync Accounts. Args: request: Request parameters Returns: TaskResult containing SyncAccountsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_accounts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_accounts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_accounts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncAccountsResponse) async def get_account_financials( self, request: GetAccountFinancialsRequest, ) -> TaskResult[GetAccountFinancialsResponse]: """ Get Account Financials. Args: request: Request parameters Returns: TaskResult containing GetAccountFinancialsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_account_financials", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_account_financials(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_account_financials", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetAccountFinancialsResponse) async def report_usage( self, request: ReportUsageRequest, ) -> TaskResult[ReportUsageResponse]: """ Report Usage. Args: request: Request parameters Returns: TaskResult containing ReportUsageResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_usage", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.report_usage(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_usage", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ReportUsageResponse) async def log_event( self, request: LogEventRequest, ) -> TaskResult[LogEventResponse]: """ Log Event. Args: request: Request parameters Returns: TaskResult containing LogEventResponse """ self._validate_task_features("log_event") operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="log_event", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.log_event(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="log_event", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, LogEventResponse) async def sync_event_sources( self, request: SyncEventSourcesRequest, ) -> TaskResult[SyncEventSourcesResponse]: """ Sync Event Sources. Args: request: Request parameters Returns: TaskResult containing SyncEventSourcesResponse """ self._validate_task_features("sync_event_sources") operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_event_sources", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_event_sources(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_event_sources", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncEventSourcesResponse) async def sync_audiences( self, request: SyncAudiencesRequest, ) -> TaskResult[SyncAudiencesResponse]: """ Sync Audiences. Args: request: Request parameters Returns: TaskResult containing SyncAudiencesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_audiences", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_audiences(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_audiences", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncAudiencesResponse) async def sync_catalogs( self, request: SyncCatalogsRequest, ) -> TaskResult[SyncCatalogsResponse]: """ Sync Catalogs. Args: request: Request parameters Returns: TaskResult containing SyncCatalogsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_catalogs", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_catalogs(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_catalogs", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCatalogsResponse) async def get_creative_delivery( self, request: GetCreativeDeliveryRequest, ) -> TaskResult[GetCreativeDeliveryResponse]: """ Get Creative Delivery. Args: request: Request parameters Returns: TaskResult containing GetCreativeDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_creative_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCreativeDeliveryResponse) # ======================================================================== # V3 Protocol Methods - Protocol Discovery # ======================================================================== async def get_adcp_capabilities( self, request: GetAdcpCapabilitiesRequest, ) -> TaskResult[GetAdcpCapabilitiesResponse]: """ Get AdCP capabilities from the agent. Queries the agent's supported AdCP features, protocol versions, and domain-specific capabilities (media_buy, signals, sponsored_intelligence). Args: request: Request parameters including optional protocol filters Returns: TaskResult containing GetAdcpCapabilitiesResponse with: - adcp: Core protocol version information - supported_protocols: List of supported domain protocols - media_buy: Media buy capabilities (if supported) - sponsored_intelligence: SI capabilities (if supported) - signals: Signals capabilities (if supported) """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_adcp_capabilities", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_adcp_capabilities(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_adcp_capabilities", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetAdcpCapabilitiesResponse) # ======================================================================== # V3 Protocol Methods - Content Standards # ======================================================================== async def create_content_standards( self, request: CreateContentStandardsRequest, ) -> TaskResult[CreateContentStandardsResponse]: """ Create a new content standards configuration. Defines acceptable content contexts for ad placement using natural language policy and optional calibration exemplars. Args: request: Request parameters including policy and scope Returns: TaskResult containing CreateContentStandardsResponse with standards_id """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateContentStandardsResponse) async def get_content_standards( self, request: GetContentStandardsRequest, ) -> TaskResult[GetContentStandardsResponse]: """ Get a content standards configuration by ID. Args: request: Request parameters including standards_id Returns: TaskResult containing GetContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetContentStandardsResponse) async def list_content_standards( self, request: ListContentStandardsRequest, ) -> TaskResult[ListContentStandardsResponse]: """ List content standards configurations. Args: request: Request parameters including optional filters Returns: TaskResult containing ListContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListContentStandardsResponse) async def update_content_standards( self, request: UpdateContentStandardsRequest, ) -> TaskResult[UpdateContentStandardsResponse]: """ Update a content standards configuration. Args: request: Request parameters including standards_id and updates Returns: TaskResult containing UpdateContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateContentStandardsResponse) async def calibrate_content( self, request: CalibrateContentRequest, ) -> TaskResult[CalibrateContentResponse]: """ Calibrate content against standards. Evaluates content (artifact or URL) against configured standards to determine suitability for ad placement. Args: request: Request parameters including content to evaluate Returns: TaskResult containing CalibrateContentResponse with verdict """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="calibrate_content", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.calibrate_content(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="calibrate_content", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CalibrateContentResponse) async def validate_content_delivery( self, request: ValidateContentDeliveryRequest, ) -> TaskResult[ValidateContentDeliveryResponse]: """ Validate content delivery against standards. Validates that ad delivery records comply with content standards. Args: request: Request parameters including delivery records Returns: TaskResult containing ValidateContentDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="validate_content_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.validate_content_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="validate_content_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ValidateContentDeliveryResponse) async def get_media_buy_artifacts( self, request: GetMediaBuyArtifactsRequest, ) -> TaskResult[GetMediaBuyArtifactsResponse]: """ Get artifacts associated with a media buy. Retrieves content artifacts where ads were delivered for a media buy. Args: request: Request parameters including media_buy_id Returns: TaskResult containing GetMediaBuyArtifactsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_artifacts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_artifacts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_artifacts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyArtifactsResponse) # ======================================================================== # V3 Protocol Methods - Sponsored Intelligence # ======================================================================== async def si_get_offering( self, request: SiGetOfferingRequest, ) -> TaskResult[SiGetOfferingResponse]: """ Get sponsored intelligence offering. Retrieves product/service offerings that can be presented in a sponsored intelligence session. Args: request: Request parameters including brand context Returns: TaskResult containing SiGetOfferingResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_get_offering", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_get_offering(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_get_offering", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiGetOfferingResponse) async def si_initiate_session( self, request: SiInitiateSessionRequest, ) -> TaskResult[SiInitiateSessionResponse]: """ Initiate a sponsored intelligence session. Starts a conversational brand experience session with a user. Args: request: Request parameters including identity and context Returns: TaskResult containing SiInitiateSessionResponse with session_id """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_initiate_session", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_initiate_session(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_initiate_session", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiInitiateSessionResponse) async def si_send_message( self, request: SiSendMessageRequest, ) -> TaskResult[SiSendMessageResponse]: """ Send a message in a sponsored intelligence session. Continues the conversation in an active SI session. Args: request: Request parameters including session_id and message Returns: TaskResult containing SiSendMessageResponse with brand response """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_send_message", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_send_message(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_send_message", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiSendMessageResponse) async def si_terminate_session( self, request: SiTerminateSessionRequest, ) -> TaskResult[SiTerminateSessionResponse]: """ Terminate a sponsored intelligence session. Ends an active SI session, optionally with follow-up actions. Args: request: Request parameters including session_id and termination context Returns: TaskResult containing SiTerminateSessionResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_terminate_session", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_terminate_session(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_terminate_session", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiTerminateSessionResponse) # ======================================================================== # V3 Governance Methods # ======================================================================== async def get_creative_features( self, request: GetCreativeFeaturesRequest, ) -> TaskResult[GetCreativeFeaturesResponse]: """Evaluate governance features for a creative manifest.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_features", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_creative_features(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_features", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCreativeFeaturesResponse) async def sync_plans( self, request: SyncPlansRequest, ) -> TaskResult[SyncPlansResponse]: """Sync campaign governance plans to the governance agent.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_plans", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_plans(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_plans", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncPlansResponse) async def check_governance( self, request: CheckGovernanceRequest, ) -> TaskResult[CheckGovernanceResponse]: """Check a proposed or committed action against campaign governance.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="check_governance", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.check_governance(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="check_governance", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CheckGovernanceResponse) async def report_plan_outcome( self, request: ReportPlanOutcomeRequest, ) -> TaskResult[ReportPlanOutcomeResponse]: """Report the outcome of a governed action to the governance agent.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_plan_outcome", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.report_plan_outcome(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_plan_outcome", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ReportPlanOutcomeResponse) async def get_plan_audit_logs( self, request: GetPlanAuditLogsRequest, ) -> TaskResult[GetPlanAuditLogsResponse]: """Retrieve governance state and audit logs for one or more plans.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_plan_audit_logs", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_plan_audit_logs(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_plan_audit_logs", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetPlanAuditLogsResponse) async def create_property_list( self, request: CreatePropertyListRequest, ) -> TaskResult[CreatePropertyListResponse]: """ Create a property list for governance filtering. Property lists define dynamic sets of properties based on filters, brand manifests, and feature requirements. Args: request: Request parameters for creating the property list Returns: TaskResult containing CreatePropertyListResponse with list_id """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreatePropertyListResponse) async def get_property_list( self, request: GetPropertyListRequest, ) -> TaskResult[GetPropertyListResponse]: """ Get a property list with optional resolution. When resolve=true, returns the list of resolved property identifiers. Use this to get the actual properties that match the list's filters. Args: request: Request parameters including list_id and resolve flag Returns: TaskResult containing GetPropertyListResponse with identifiers """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetPropertyListResponse) async def list_property_lists( self, request: ListPropertyListsRequest, ) -> TaskResult[ListPropertyListsResponse]: """ List property lists owned by a principal. Retrieves metadata for all property lists, optionally filtered by principal or pagination parameters. Args: request: Request parameters with optional filtering Returns: TaskResult containing ListPropertyListsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_property_lists", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_property_lists(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_property_lists", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListPropertyListsResponse) async def update_property_list( self, request: UpdatePropertyListRequest, ) -> TaskResult[UpdatePropertyListResponse]: """ Update a property list. Modifies the filters, brand manifest, or other parameters of an existing property list. Args: request: Request parameters with list_id and updates Returns: TaskResult containing UpdatePropertyListResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdatePropertyListResponse) async def delete_property_list( self, request: DeletePropertyListRequest, ) -> TaskResult[DeletePropertyListResponse]: """ Delete a property list. Removes a property list. Any active subscriptions to this list will be terminated. Args: request: Request parameters with list_id Returns: TaskResult containing DeletePropertyListResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.delete_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, DeletePropertyListResponse) async def list_tools(self) -> list[str]: """ List available tools from the agent. Returns: List of tool names """ return await self.adapter.list_tools() async def get_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata. Returns agent card information including: - Agent name, description, version - Protocol type (mcp or a2a) - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills Returns: Dictionary with agent metadata """ return await self.adapter.get_agent_info() async def close(self) -> None: """Close the adapter and clean up resources.""" if hasattr(self.adapter, "close"): logger.debug(f"Closing adapter for agent {self.agent_config.id}") await self.adapter.close() async def __aenter__(self) -> ADCPClient: """Async context manager entry.""" return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Async context manager exit.""" await self.close() def _verify_webhook_signature( self, payload: dict[str, Any], signature: str, timestamp: str, raw_body: bytes | str | None = None, ) -> bool: """ Verify HMAC-SHA256 signature of webhook payload. The verification algorithm matches get_adcp_signed_headers_for_webhook: 1. Constructs message as "{timestamp}.{json_payload}" 2. Uses raw HTTP body bytes when available (preserves sender's serialization) 3. Falls back to json.dumps() if raw_body not provided 4. HMAC-SHA256 signs with the shared secret 5. Compares against the provided signature (with "sha256=" prefix stripped) Args: payload: Webhook payload dict (used as fallback if raw_body not provided) signature: Signature to verify (with or without "sha256=" prefix) timestamp: Unix timestamp in seconds from X-AdCP-Timestamp header raw_body: Raw HTTP request body bytes. When provided, used directly for signature verification to avoid cross-language serialization mismatches. Strongly recommended for production use. Returns: True if signature is valid, False otherwise """ if not self.webhook_secret: logger.warning( "Webhook signature verification skipped: no webhook_secret configured" ) return True # Reject stale or future timestamps to prevent replay attacks try: ts = int(timestamp) except (ValueError, TypeError): return False now = int(time.time()) if abs(now - ts) > self.webhook_timestamp_tolerance: return False # Strip "sha256=" prefix if present if signature.startswith("sha256="): signature = signature[7:] # Use raw body if available (avoids cross-language serialization mismatches), # otherwise fall back to json.dumps() for backward compatibility if raw_body is not None: payload_str = raw_body.decode("utf-8") if isinstance(raw_body, bytes) else raw_body else: payload_str = json.dumps(payload) # Construct signed message: timestamp.payload signed_message = f"{timestamp}.{payload_str}" # Generate expected signature expected_signature = hmac.new( self.webhook_secret.encode("utf-8"), signed_message.encode("utf-8"), hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected_signature) def _parse_webhook_result( self, task_id: str, task_type: str, operation_id: str, status: GeneratedTaskStatus, result: Any, timestamp: datetime | str, message: str | None, context_id: str | None, ) -> TaskResult[AdcpAsyncResponseData]: """ Parse webhook data into typed TaskResult based on task_type. Args: task_id: Unique identifier for this task task_type: Task type from application routing (e.g., "get_products") operation_id: Operation identifier from application routing status: Current task status result: Task-specific payload (AdCP response data) timestamp: ISO 8601 timestamp when webhook was generated message: Human-readable summary of task state context_id: Session/conversation identifier Returns: TaskResult with task-specific typed response data Note: This method works with both MCP and A2A protocols by accepting protocol-agnostic parameters rather than protocol-specific objects. """ from adcp.utils.response_parser import parse_json_or_text # Map task types to their response types (using string literals, not enum) # Note: Some response types are Union types (e.g., ActivateSignalResponse = Success | Error) response_type_map: dict[str, type[BaseModel] | Any] = { # Core operations "get_products": GetProductsResponse, "list_creative_formats": ListCreativeFormatsResponse, "sync_creatives": SyncCreativesResponse, "list_creatives": ListCreativesResponse, "build_creative": BuildCreativeResponse, "preview_creative": PreviewCreativeResponse, "create_media_buy": CreateMediaBuyResponse, "update_media_buy": UpdateMediaBuyResponse, "get_media_buy_delivery": GetMediaBuyDeliveryResponse, "get_media_buys": GetMediaBuysResponse, "get_signals": GetSignalsResponse, "activate_signal": ActivateSignalResponse, "provide_performance_feedback": ProvidePerformanceFeedbackResponse, "report_usage": ReportUsageResponse, "get_account_financials": GetAccountFinancialsResponse, "list_accounts": ListAccountsResponse, "sync_accounts": SyncAccountsResponse, "log_event": LogEventResponse, "sync_event_sources": SyncEventSourcesResponse, "sync_audiences": SyncAudiencesResponse, "sync_catalogs": SyncCatalogsResponse, "get_creative_delivery": GetCreativeDeliveryResponse, # V3 Protocol Discovery "get_adcp_capabilities": GetAdcpCapabilitiesResponse, # V3 Content Standards "create_content_standards": CreateContentStandardsResponse, "get_content_standards": GetContentStandardsResponse, "list_content_standards": ListContentStandardsResponse, "update_content_standards": UpdateContentStandardsResponse, "calibrate_content": CalibrateContentResponse, "validate_content_delivery": ValidateContentDeliveryResponse, "get_media_buy_artifacts": GetMediaBuyArtifactsResponse, # V3 Sponsored Intelligence "si_get_offering": SiGetOfferingResponse, "si_initiate_session": SiInitiateSessionResponse, "si_send_message": SiSendMessageResponse, "si_terminate_session": SiTerminateSessionResponse, # V3 Governance "get_creative_features": GetCreativeFeaturesResponse, "sync_plans": SyncPlansResponse, "check_governance": CheckGovernanceResponse, "report_plan_outcome": ReportPlanOutcomeResponse, "get_plan_audit_logs": GetPlanAuditLogsResponse, "create_property_list": CreatePropertyListResponse, "get_property_list": GetPropertyListResponse, "list_property_lists": ListPropertyListsResponse, "update_property_list": UpdatePropertyListResponse, "delete_property_list": DeletePropertyListResponse, } # Handle completed tasks with result parsing if status == GeneratedTaskStatus.completed and result is not None: response_type = response_type_map.get(task_type) if response_type: try: parsed_result: Any = parse_json_or_text(result, response_type) return TaskResult[AdcpAsyncResponseData]( status=TaskStatus.COMPLETED, data=parsed_result, success=True, metadata={ "task_id": task_id, "operation_id": operation_id, "timestamp": timestamp, "message": message, }, ) except ValueError as e: logger.warning(f"Failed to parse webhook result: {e}") # Fall through to untyped result # Handle failed, input-required, or unparseable results # Convert status to core TaskStatus enum status_map = { GeneratedTaskStatus.completed: TaskStatus.COMPLETED, GeneratedTaskStatus.submitted: TaskStatus.SUBMITTED, GeneratedTaskStatus.working: TaskStatus.WORKING, GeneratedTaskStatus.failed: TaskStatus.FAILED, GeneratedTaskStatus.input_required: TaskStatus.NEEDS_INPUT, } task_status = status_map.get(status, TaskStatus.FAILED) # Extract error message from result.errors if present error_message: str | None = None if result is not None and hasattr(result, "errors"): errors = getattr(result, "errors", None) if errors and len(errors) > 0: first_error = errors[0] if hasattr(first_error, "message"): error_message = first_error.message return TaskResult[AdcpAsyncResponseData]( status=task_status, data=result, success=status == GeneratedTaskStatus.completed, error=error_message, metadata={ "task_id": task_id, "operation_id": operation_id, "timestamp": timestamp, "message": message, "context_id": context_id, }, ) async def _handle_mcp_webhook( self, payload: dict[str, Any], task_type: str, operation_id: str, signature: str | None, timestamp: str | None = None, raw_body: bytes | str | None = None, ) -> TaskResult[AdcpAsyncResponseData]: """ Handle MCP webhook delivered via HTTP POST. Args: payload: Webhook payload dict task_type: Task type from application routing operation_id: Operation identifier from application routing signature: Optional HMAC-SHA256 signature for verification (X-AdCP-Signature header) timestamp: Optional Unix timestamp for signature verification (X-AdCP-Timestamp header) raw_body: Optional raw HTTP request body for signature verification Returns: TaskResult with parsed task-specific response data Raises: ADCPWebhookSignatureError: If signature verification fails ValidationError: If payload doesn't match McpWebhookPayload schema """ from adcp.types.generated_poc.core.mcp_webhook_payload import McpWebhookPayload # When a webhook_secret is configured, require signed webhooks if self.webhook_secret: if not signature or not timestamp: raise ADCPWebhookSignatureError( "Webhook signature and timestamp headers are required" ) if not self._verify_webhook_signature(payload, signature, timestamp, raw_body): logger.warning( f"Webhook signature verification failed for agent {self.agent_config.id}" ) raise ADCPWebhookSignatureError("Invalid webhook signature") # Validate and parse MCP webhook payload webhook = McpWebhookPayload.model_validate(payload) # Emit activity for monitoring self._emit_activity( Activity( type=ActivityType.WEBHOOK_RECEIVED, operation_id=operation_id, agent_id=self.agent_config.id, task_type=task_type, timestamp=datetime.now(timezone.utc).isoformat(), metadata={"payload": payload, "protocol": "mcp"}, ) ) # Extract fields and parse result return self._parse_webhook_result( task_id=webhook.task_id, task_type=task_type, operation_id=operation_id, status=webhook.status, result=webhook.result, timestamp=webhook.timestamp, message=webhook.message, context_id=webhook.context_id, ) async def _handle_a2a_webhook( self, payload: Task | TaskStatusUpdateEvent, task_type: str, operation_id: str ) -> TaskResult[AdcpAsyncResponseData]: """ Handle A2A webhook delivered through Task or TaskStatusUpdateEvent. Per A2A specification: - Terminated statuses (completed, failed): Payload is Task with artifacts[].parts[] - Intermediate statuses (working, input-required, submitted): Payload is TaskStatusUpdateEvent with status.message.parts[] Args: payload: A2A Task or TaskStatusUpdateEvent object task_type: Task type from application routing operation_id: Operation identifier from application routing Returns: TaskResult with parsed task-specific response data Note: Signature verification is NOT applicable for A2A webhooks as they arrive through authenticated A2A connections, not HTTP. """ from a2a.types import DataPart, TextPart adcp_data: Any = None text_message: str | None = None task_id: str context_id: str | None status_state: str timestamp: datetime | str # Type detection and extraction based on payload type if isinstance(payload, TaskStatusUpdateEvent): # Intermediate status: Extract from status.message.parts[] task_id = payload.task_id context_id = payload.context_id status_state = payload.status.state if payload.status else "failed" timestamp = ( payload.status.timestamp if payload.status and payload.status.timestamp else datetime.now(timezone.utc) ) # Extract from status.message.parts[] if payload.status and payload.status.message and payload.status.message.parts: # Extract DataPart for structured AdCP payload data_parts = [ p.root for p in payload.status.message.parts if isinstance(p.root, DataPart) ] if data_parts: # Use last DataPart as authoritative last_data_part = data_parts[-1] adcp_data = last_data_part.data # Unwrap {"response": {...}} wrapper if present (ADK pattern) if isinstance(adcp_data, dict) and "response" in adcp_data: adcp_data = adcp_data["response"] # Extract TextPart for human-readable message for part in payload.status.message.parts: if isinstance(part.root, TextPart): text_message = part.root.text break else: # Terminated status (Task): Extract from artifacts[].parts[] task_id = payload.id context_id = payload.context_id status_state = payload.status.state if payload.status else "failed" timestamp = ( payload.status.timestamp if payload.status and payload.status.timestamp else datetime.now(timezone.utc) ) # Extract from task.artifacts[].parts[] # Following A2A spec: use last artifact, last DataPart is authoritative if payload.artifacts: # Use last artifact (most recent in streaming scenarios) target_artifact = payload.artifacts[-1] if target_artifact.parts: # Extract DataPart for structured AdCP payload data_parts = [ p.root for p in target_artifact.parts if isinstance(p.root, DataPart) ] if data_parts: # Use last DataPart as authoritative last_data_part = data_parts[-1] adcp_data = last_data_part.data # Unwrap {"response": {...}} wrapper if present (ADK pattern) if isinstance(adcp_data, dict) and "response" in adcp_data: adcp_data = adcp_data["response"] # Extract TextPart for human-readable message for part in target_artifact.parts: if isinstance(part.root, TextPart): text_message = part.root.text break # Map A2A status.state to GeneratedTaskStatus enum status_map = { "completed": GeneratedTaskStatus.completed, "submitted": GeneratedTaskStatus.submitted, "working": GeneratedTaskStatus.working, "failed": GeneratedTaskStatus.failed, "input-required": GeneratedTaskStatus.input_required, "input_required": GeneratedTaskStatus.input_required, # Handle both formats } mapped_status = status_map.get(status_state, GeneratedTaskStatus.failed) # Emit activity for monitoring self._emit_activity( Activity( type=ActivityType.WEBHOOK_RECEIVED, operation_id=operation_id, agent_id=self.agent_config.id, task_type=task_type, timestamp=datetime.now(timezone.utc).isoformat(), metadata={ "task_id": task_id, "protocol": "a2a", "payload_type": ( "TaskStatusUpdateEvent" if isinstance(payload, TaskStatusUpdateEvent) else "Task" ), }, ) ) # Parse and return typed result by passing extracted fields directly return self._parse_webhook_result( task_id=task_id, task_type=task_type, operation_id=operation_id, status=mapped_status, result=adcp_data, timestamp=timestamp, message=text_message, context_id=context_id, ) async def handle_webhook( self, payload: dict[str, Any] | Task | TaskStatusUpdateEvent, task_type: str, operation_id: str, signature: str | None = None, timestamp: str | None = None, raw_body: bytes | str | None = None, ) -> TaskResult[AdcpAsyncResponseData]: """ Handle incoming webhook and return typed result. This method provides a unified interface for handling webhooks from both MCP and A2A protocols: - MCP Webhooks: HTTP POST with dict payload, optional HMAC signature - A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status The method automatically detects the protocol type and routes to the appropriate handler. Both protocols return a consistent TaskResult structure with typed AdCP response data. Args: payload: Webhook payload - one of: - dict[str, Any]: MCP webhook payload from HTTP POST - Task: A2A webhook for terminated statuses (completed, failed) - TaskStatusUpdateEvent: A2A webhook for intermediate statuses (working, input-required, submitted) task_type: Task type from application routing (e.g., "get_products"). Applications should extract this from URL routing pattern: /webhook/{task_type}/{agent_id}/{operation_id} operation_id: Operation identifier from application routing. Used to correlate webhook notifications with original task submission. signature: Optional HMAC-SHA256 signature for MCP webhook verification (X-AdCP-Signature header). Ignored for A2A webhooks. timestamp: Optional Unix timestamp (seconds) for MCP webhook signature verification (X-AdCP-Timestamp header). Required when signature is provided. raw_body: Optional raw HTTP request body bytes for signature verification. When provided, used directly instead of re-serializing the payload, avoiding cross-language JSON serialization mismatches. Strongly recommended for production use. Returns: TaskResult with parsed task-specific response data. The structure is identical regardless of protocol. Raises: ADCPWebhookSignatureError: If MCP signature verification fails ValidationError: If MCP payload doesn't match WebhookPayload schema Note: task_type and operation_id were deprecated from the webhook payload per AdCP specification. Applications must extract these from URL routing and pass them explicitly. Examples: MCP webhook (HTTP endpoint): >>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}") >>> async def webhook_handler(task_type: str, operation_id: str, request: Request): >>> raw_body = await request.body() >>> payload = json.loads(raw_body) >>> signature = request.headers.get("X-AdCP-Signature") >>> timestamp = request.headers.get("X-AdCP-Timestamp") >>> result = await client.handle_webhook( >>> payload, task_type, operation_id, signature, timestamp, >>> raw_body=raw_body, >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}") A2A webhook with Task (terminated status): >>> async def on_task_completed(task: Task): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(task.id) >>> operation_id = your_task_registry.get_operation_id(task.id) >>> result = await client.handle_webhook( >>> task, task_type, operation_id >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}") A2A webhook with TaskStatusUpdateEvent (intermediate status): >>> async def on_task_update(event: TaskStatusUpdateEvent): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(event.task_id) >>> operation_id = your_task_registry.get_operation_id(event.task_id) >>> result = await client.handle_webhook( >>> event, task_type, operation_id >>> ) >>> if result.status == GeneratedTaskStatus.working: >>> print(f"Task still working: {result.metadata.get('message')}") """ # Detect protocol type and route to appropriate handler if isinstance(payload, (Task, TaskStatusUpdateEvent)): # A2A webhook (Task or TaskStatusUpdateEvent) return await self._handle_a2a_webhook(payload, task_type, operation_id) else: # MCP webhook (dict payload) return await self._handle_mcp_webhook( payload, task_type, operation_id, signature, timestamp, raw_body )Client for interacting with a single AdCP agent.
Initialize ADCP client for a single agent.
Args
agent_config- Agent configuration
webhook_url_template- Template for webhook URLs with {agent_id}, {task_type}, {operation_id}
webhook_secret- Secret for webhook signature verification
on_activity- Callback for activity events
webhook_timestamp_tolerance- Maximum age (in seconds) for webhook timestamps. Webhooks with timestamps older than this or more than this far in the future are rejected. Defaults to 300 (5 minutes).
capabilities_ttl- Time-to-live in seconds for cached capabilities (default: 1 hour)
validate_features- When True, automatically check that the seller supports required features before making task calls (e.g., sync_audiences requires audience_targeting). Requires capabilities to have been fetched first.
Instance variables
prop capabilities : GetAdcpCapabilitiesResponse | None-
Expand source code
@property def capabilities(self) -> GetAdcpCapabilitiesResponse | None: """Return cached capabilities, or None if not yet fetched.""" return self._capabilitiesReturn cached capabilities, or None if not yet fetched.
prop feature_resolver : FeatureResolver | None-
Expand source code
@property def feature_resolver(self) -> FeatureResolver | None: """Return the FeatureResolver for cached capabilities, or None.""" return self._feature_resolverReturn the FeatureResolver for cached capabilities, or None.
Methods
async def activate_signal(self, request: ActivateSignalRequest) ‑> TaskResult[Union[ActivateSignalResponse1, ActivateSignalResponse2]]-
Expand source code
async def activate_signal( self, request: ActivateSignalRequest, ) -> TaskResult[ActivateSignalResponse]: """ Activate Signal. Args: request: Request parameters Returns: TaskResult containing ActivateSignalResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.activate_signal(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ActivateSignalResponse)Activate Signal.
Args
request- Request parameters
Returns
TaskResult containing ActivateSignalResponse
async def build_creative(self, request: BuildCreativeRequest) ‑> TaskResult[Union[BuildCreativeResponse1, BuildCreativeResponse2, BuildCreativeResponse3]]-
Expand source code
async def build_creative( self, request: BuildCreativeRequest, ) -> TaskResult[BuildCreativeResponse]: """ Generate production-ready creative assets. Requests the creative agent to build final deliverable assets in the target format (e.g., VAST, DAAST, HTML5). This is typically called after previewing and approving a creative manifest. Args: request: Creative build parameters including: - manifest: Creative manifest with brand info and content - target_format_id: Desired output format identifier - inputs: Optional user-provided inputs for template variables - deployment: Platform or agent deployment configuration Returns: TaskResult containing BuildCreativeResponse with: - assets: Production-ready creative files (URLs or inline content) - format_id: The generated format identifier - manifest: The creative manifest used for generation - metadata: Additional platform-specific details Example: >>> from adcp import ADCPClient, BuildCreativeRequest >>> client = ADCPClient(agent_config) >>> request = BuildCreativeRequest( ... manifest=creative_manifest, ... target_format_id="vast_2.0", ... inputs={"duration": 30} ... ) >>> result = await client.build_creative(request) >>> if result.success: ... vast_url = result.data.assets[0].url """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="build_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.build_creative(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="build_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, BuildCreativeResponse)Generate production-ready creative assets.
Requests the creative agent to build final deliverable assets in the target format (e.g., VAST, DAAST, HTML5). This is typically called after previewing and approving a creative manifest.
Args
request- Creative build parameters including: - manifest: Creative manifest with brand info and content - target_format_id: Desired output format identifier - inputs: Optional user-provided inputs for template variables - deployment: Platform or agent deployment configuration
Returns
TaskResult containing BuildCreativeResponse with: - assets: Production-ready creative files (URLs or inline content) - format_id: The generated format identifier - manifest: The creative manifest used for generation - metadata: Additional platform-specific details
Example
>>> from adcp import ADCPClient, BuildCreativeRequest >>> client = ADCPClient(agent_config) >>> request = BuildCreativeRequest( ... manifest=creative_manifest, ... target_format_id="vast_2.0", ... inputs={"duration": 30} ... ) >>> result = await client.build_creative(request) >>> if result.success: ... vast_url = result.data.assets[0].url async def calibrate_content(self, request: CalibrateContentRequest) ‑> TaskResult[Union[CalibrateContentResponse1, CalibrateContentResponse2]]-
Expand source code
async def calibrate_content( self, request: CalibrateContentRequest, ) -> TaskResult[CalibrateContentResponse]: """ Calibrate content against standards. Evaluates content (artifact or URL) against configured standards to determine suitability for ad placement. Args: request: Request parameters including content to evaluate Returns: TaskResult containing CalibrateContentResponse with verdict """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="calibrate_content", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.calibrate_content(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="calibrate_content", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CalibrateContentResponse)Calibrate content against standards.
Evaluates content (artifact or URL) against configured standards to determine suitability for ad placement.
Args
request- Request parameters including content to evaluate
Returns
TaskResult containing CalibrateContentResponse with verdict
async def check_governance(self, request: CheckGovernanceRequest) ‑> TaskResult[CheckGovernanceResponse]-
Expand source code
async def check_governance( self, request: CheckGovernanceRequest, ) -> TaskResult[CheckGovernanceResponse]: """Check a proposed or committed action against campaign governance.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="check_governance", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.check_governance(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="check_governance", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CheckGovernanceResponse)Check a proposed or committed action against campaign governance.
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close the adapter and clean up resources.""" if hasattr(self.adapter, "close"): logger.debug(f"Closing adapter for agent {self.agent_config.id}") await self.adapter.close()Close the adapter and clean up resources.
async def create_content_standards(self, request: CreateContentStandardsRequest) ‑> TaskResult[Union[CreateContentStandardsResponse1, CreateContentStandardsResponse2]]-
Expand source code
async def create_content_standards( self, request: CreateContentStandardsRequest, ) -> TaskResult[CreateContentStandardsResponse]: """ Create a new content standards configuration. Defines acceptable content contexts for ad placement using natural language policy and optional calibration exemplars. Args: request: Request parameters including policy and scope Returns: TaskResult containing CreateContentStandardsResponse with standards_id """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateContentStandardsResponse)Create a new content standards configuration.
Defines acceptable content contexts for ad placement using natural language policy and optional calibration exemplars.
Args
request- Request parameters including policy and scope
Returns
TaskResult containing CreateContentStandardsResponse with standards_id
async def create_media_buy(self, request: CreateMediaBuyRequest) ‑> TaskResult[Union[CreateMediaBuyResponse1, CreateMediaBuyResponse2]]-
Expand source code
async def create_media_buy( self, request: CreateMediaBuyRequest, ) -> TaskResult[CreateMediaBuyResponse]: """ Create a new media buy reservation. Requests the agent to reserve inventory for a campaign. The agent returns a media_buy_id that tracks this reservation and can be used for updates. Args: request: Media buy creation parameters including: - brand_manifest: Advertiser brand information and creative assets - packages: List of package requests specifying desired inventory - publisher_properties: Target properties for ad placement - budget: Optional budget constraints - start_date/end_date: Campaign flight dates Returns: TaskResult containing CreateMediaBuyResponse with: - media_buy_id: Unique identifier for this reservation - status: Current state of the media buy - packages: Confirmed package details - Additional platform-specific metadata Example: >>> from adcp import ADCPClient, CreateMediaBuyRequest >>> client = ADCPClient(agent_config) >>> request = CreateMediaBuyRequest( ... brand_manifest=brand, ... packages=[package_request], ... publisher_properties=properties ... ) >>> result = await client.create_media_buy(request) >>> if result.success: ... media_buy_id = result.data.media_buy_id """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_media_buy", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_media_buy(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_media_buy", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateMediaBuyResponse)Create a new media buy reservation.
Requests the agent to reserve inventory for a campaign. The agent returns a media_buy_id that tracks this reservation and can be used for updates.
Args
request- Media buy creation parameters including: - brand_manifest: Advertiser brand information and creative assets - packages: List of package requests specifying desired inventory - publisher_properties: Target properties for ad placement - budget: Optional budget constraints - start_date/end_date: Campaign flight dates
Returns
TaskResult containing CreateMediaBuyResponse with: - media_buy_id: Unique identifier for this reservation - status: Current state of the media buy - packages: Confirmed package details - Additional platform-specific metadata
Example
>>> from adcp import ADCPClient, CreateMediaBuyRequest >>> client = ADCPClient(agent_config) >>> request = CreateMediaBuyRequest( ... brand_manifest=brand, ... packages=[package_request], ... publisher_properties=properties ... ) >>> result = await client.create_media_buy(request) >>> if result.success: ... media_buy_id = result.data.media_buy_id async def create_property_list(self, request: CreatePropertyListRequest) ‑> TaskResult[CreatePropertyListResponse]-
Expand source code
async def create_property_list( self, request: CreatePropertyListRequest, ) -> TaskResult[CreatePropertyListResponse]: """ Create a property list for governance filtering. Property lists define dynamic sets of properties based on filters, brand manifests, and feature requirements. Args: request: Request parameters for creating the property list Returns: TaskResult containing CreatePropertyListResponse with list_id """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreatePropertyListResponse)Create a property list for governance filtering.
Property lists define dynamic sets of properties based on filters, brand manifests, and feature requirements.
Args
request- Request parameters for creating the property list
Returns
TaskResult containing CreatePropertyListResponse with list_id
async def delete_property_list(self, request: DeletePropertyListRequest) ‑> TaskResult[DeletePropertyListResponse]-
Expand source code
async def delete_property_list( self, request: DeletePropertyListRequest, ) -> TaskResult[DeletePropertyListResponse]: """ Delete a property list. Removes a property list. Any active subscriptions to this list will be terminated. Args: request: Request parameters with list_id Returns: TaskResult containing DeletePropertyListResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.delete_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, DeletePropertyListResponse)Delete a property list.
Removes a property list. Any active subscriptions to this list will be terminated.
Args
request- Request parameters with list_id
Returns
TaskResult containing DeletePropertyListResponse
async def fetch_capabilities(self) ‑> GetAdcpCapabilitiesResponse-
Expand source code
async def fetch_capabilities(self) -> GetAdcpCapabilitiesResponse: """Fetch capabilities, using cache if still valid. Returns: The seller's capabilities response. """ if self._capabilities is not None and self._capabilities_fetched_at is not None: elapsed = time.monotonic() - self._capabilities_fetched_at if elapsed < self.capabilities_ttl: return self._capabilities return await self.refresh_capabilities()Fetch capabilities, using cache if still valid.
Returns
The seller's capabilities response.
async def get_account_financials(self, request: GetAccountFinancialsRequest) ‑> TaskResult[Union[GetAccountFinancialsResponse1, GetAccountFinancialsResponse2]]-
Expand source code
async def get_account_financials( self, request: GetAccountFinancialsRequest, ) -> TaskResult[GetAccountFinancialsResponse]: """ Get Account Financials. Args: request: Request parameters Returns: TaskResult containing GetAccountFinancialsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_account_financials", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_account_financials(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_account_financials", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetAccountFinancialsResponse)Get Account Financials.
Args
request- Request parameters
Returns
TaskResult containing GetAccountFinancialsResponse
async def get_adcp_capabilities(self, request: GetAdcpCapabilitiesRequest) ‑> TaskResult[GetAdcpCapabilitiesResponse]-
Expand source code
async def get_adcp_capabilities( self, request: GetAdcpCapabilitiesRequest, ) -> TaskResult[GetAdcpCapabilitiesResponse]: """ Get AdCP capabilities from the agent. Queries the agent's supported AdCP features, protocol versions, and domain-specific capabilities (media_buy, signals, sponsored_intelligence). Args: request: Request parameters including optional protocol filters Returns: TaskResult containing GetAdcpCapabilitiesResponse with: - adcp: Core protocol version information - supported_protocols: List of supported domain protocols - media_buy: Media buy capabilities (if supported) - sponsored_intelligence: SI capabilities (if supported) - signals: Signals capabilities (if supported) """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_adcp_capabilities", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_adcp_capabilities(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_adcp_capabilities", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetAdcpCapabilitiesResponse)Get AdCP capabilities from the agent.
Queries the agent's supported AdCP features, protocol versions, and domain-specific capabilities (media_buy, signals, sponsored_intelligence).
Args
request- Request parameters including optional protocol filters
Returns
TaskResult containing GetAdcpCapabilitiesResponse with: - adcp: Core protocol version information - supported_protocols: List of supported domain protocols - media_buy: Media buy capabilities (if supported) - sponsored_intelligence: SI capabilities (if supported) - signals: Signals capabilities (if supported)
async def get_content_standards(self, request: GetContentStandardsRequest) ‑> TaskResult[Union[GetContentStandardsResponse1, GetContentStandardsResponse2]]-
Expand source code
async def get_content_standards( self, request: GetContentStandardsRequest, ) -> TaskResult[GetContentStandardsResponse]: """ Get a content standards configuration by ID. Args: request: Request parameters including standards_id Returns: TaskResult containing GetContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetContentStandardsResponse)Get a content standards configuration by ID.
Args
request- Request parameters including standards_id
Returns
TaskResult containing GetContentStandardsResponse
async def get_creative_delivery(self, request: GetCreativeDeliveryRequest) ‑> TaskResult[GetCreativeDeliveryResponse]-
Expand source code
async def get_creative_delivery( self, request: GetCreativeDeliveryRequest, ) -> TaskResult[GetCreativeDeliveryResponse]: """ Get Creative Delivery. Args: request: Request parameters Returns: TaskResult containing GetCreativeDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_creative_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCreativeDeliveryResponse)Get Creative Delivery.
Args
request- Request parameters
Returns
TaskResult containing GetCreativeDeliveryResponse
async def get_creative_features(self, request: GetCreativeFeaturesRequest) ‑> TaskResult[Union[GetCreativeFeaturesResponse1, GetCreativeFeaturesResponse2]]-
Expand source code
async def get_creative_features( self, request: GetCreativeFeaturesRequest, ) -> TaskResult[GetCreativeFeaturesResponse]: """Evaluate governance features for a creative manifest.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_features", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_creative_features(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_features", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCreativeFeaturesResponse)Evaluate governance features for a creative manifest.
async def get_info(self) ‑> dict[str, typing.Any]-
Expand source code
async def get_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata. Returns agent card information including: - Agent name, description, version - Protocol type (mcp or a2a) - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills Returns: Dictionary with agent metadata """ return await self.adapter.get_agent_info()Get agent information including AdCP extension metadata.
Returns agent card information including: - Agent name, description, version - Protocol type (mcp or a2a) - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills
Returns
Dictionary with agent metadata
async def get_media_buy_artifacts(self, request: GetMediaBuyArtifactsRequest) ‑> TaskResult[Union[GetMediaBuyArtifactsResponse1, GetMediaBuyArtifactsResponse2]]-
Expand source code
async def get_media_buy_artifacts( self, request: GetMediaBuyArtifactsRequest, ) -> TaskResult[GetMediaBuyArtifactsResponse]: """ Get artifacts associated with a media buy. Retrieves content artifacts where ads were delivered for a media buy. Args: request: Request parameters including media_buy_id Returns: TaskResult containing GetMediaBuyArtifactsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_artifacts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_artifacts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_artifacts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyArtifactsResponse)Get artifacts associated with a media buy.
Retrieves content artifacts where ads were delivered for a media buy.
Args
request- Request parameters including media_buy_id
Returns
TaskResult containing GetMediaBuyArtifactsResponse
async def get_media_buy_delivery(self, request: GetMediaBuyDeliveryRequest) ‑> TaskResult[GetMediaBuyDeliveryResponse]-
Expand source code
async def get_media_buy_delivery( self, request: GetMediaBuyDeliveryRequest, ) -> TaskResult[GetMediaBuyDeliveryResponse]: """ Get Media Buy Delivery. Args: request: Request parameters Returns: TaskResult containing GetMediaBuyDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse)Get Media Buy Delivery.
Args
request- Request parameters
Returns
TaskResult containing GetMediaBuyDeliveryResponse
async def get_media_buys(self, request: GetMediaBuysRequest) ‑> TaskResult[GetMediaBuysResponse]-
Expand source code
async def get_media_buys( self, request: GetMediaBuysRequest, ) -> TaskResult[GetMediaBuysResponse]: """ Get Media Buys. Args: request: Request parameters Returns: TaskResult containing GetMediaBuysResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buys", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buys(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buys", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuysResponse)Get Media Buys.
Args
request- Request parameters
Returns
TaskResult containing GetMediaBuysResponse
async def get_plan_audit_logs(self, request: GetPlanAuditLogsRequest) ‑> TaskResult[GetPlanAuditLogsResponse]-
Expand source code
async def get_plan_audit_logs( self, request: GetPlanAuditLogsRequest, ) -> TaskResult[GetPlanAuditLogsResponse]: """Retrieve governance state and audit logs for one or more plans.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_plan_audit_logs", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_plan_audit_logs(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_plan_audit_logs", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetPlanAuditLogsResponse)Retrieve governance state and audit logs for one or more plans.
async def get_products(self,
request: GetProductsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url',
creative_agent_client: ADCPClient | None = None) ‑> TaskResult[GetProductsResponse]-
Expand source code
async def get_products( self, request: GetProductsRequest, fetch_previews: bool = False, preview_output_format: str = "url", creative_agent_client: ADCPClient | None = None, ) -> TaskResult[GetProductsResponse]: """ Get advertising products. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) creative_agent_client: Client for creative agent (required if fetch_previews=True) Returns: TaskResult containing GetProductsResponse with optional preview URLs in metadata Raises: ValueError: If fetch_previews=True but creative_agent_client is not provided """ if fetch_previews and not creative_agent_client: raise ValueError("creative_agent_client is required when fetch_previews=True") operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_products(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[GetProductsResponse] = self.adapter._parse_response( raw_result, GetProductsResponse ) if fetch_previews and result.success and result.data and creative_agent_client: from adcp.utils.preview_cache import add_preview_urls_to_products products_with_previews = await add_preview_urls_to_products( result.data.products, creative_agent_client, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["products_with_previews"] = products_with_previews return resultGet advertising products.
Args
request- Request parameters
fetch_previews- If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement)
preview_output_format- "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)
creative_agent_client- Client for creative agent (required if fetch_previews=True)
Returns
TaskResult containing GetProductsResponse with optional preview URLs in metadata
Raises
ValueError- If fetch_previews=True but creative_agent_client is not provided
async def get_property_list(self, request: GetPropertyListRequest) ‑> TaskResult[GetPropertyListResponse]-
Expand source code
async def get_property_list( self, request: GetPropertyListRequest, ) -> TaskResult[GetPropertyListResponse]: """ Get a property list with optional resolution. When resolve=true, returns the list of resolved property identifiers. Use this to get the actual properties that match the list's filters. Args: request: Request parameters including list_id and resolve flag Returns: TaskResult containing GetPropertyListResponse with identifiers """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetPropertyListResponse)Get a property list with optional resolution.
When resolve=true, returns the list of resolved property identifiers. Use this to get the actual properties that match the list's filters.
Args
request- Request parameters including list_id and resolve flag
Returns
TaskResult containing GetPropertyListResponse with identifiers
async def get_signals(self, request: GetSignalsRequest) ‑> TaskResult[GetSignalsResponse]-
Expand source code
async def get_signals( self, request: GetSignalsRequest, ) -> TaskResult[GetSignalsResponse]: """ Get Signals. Args: request: Request parameters Returns: TaskResult containing GetSignalsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_signals(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetSignalsResponse)Get Signals.
Args
request- Request parameters
Returns
TaskResult containing GetSignalsResponse
def get_webhook_url(self, task_type: str, operation_id: str) ‑> str-
Expand source code
def get_webhook_url(self, task_type: str, operation_id: str) -> str: """Generate webhook URL for a task.""" if not self.webhook_url_template: raise ValueError("webhook_url_template not configured") return self.webhook_url_template.format( agent_id=self.agent_config.id, task_type=task_type, operation_id=operation_id, )Generate webhook URL for a task.
async def handle_webhook(self,
payload: dict[str, Any] | Task | TaskStatusUpdateEvent,
task_type: str,
operation_id: str,
signature: str | None = None,
timestamp: str | None = None,
raw_body: bytes | str | None = None) ‑> TaskResult[AdcpAsyncResponseData]-
Expand source code
async def handle_webhook( self, payload: dict[str, Any] | Task | TaskStatusUpdateEvent, task_type: str, operation_id: str, signature: str | None = None, timestamp: str | None = None, raw_body: bytes | str | None = None, ) -> TaskResult[AdcpAsyncResponseData]: """ Handle incoming webhook and return typed result. This method provides a unified interface for handling webhooks from both MCP and A2A protocols: - MCP Webhooks: HTTP POST with dict payload, optional HMAC signature - A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status The method automatically detects the protocol type and routes to the appropriate handler. Both protocols return a consistent TaskResult structure with typed AdCP response data. Args: payload: Webhook payload - one of: - dict[str, Any]: MCP webhook payload from HTTP POST - Task: A2A webhook for terminated statuses (completed, failed) - TaskStatusUpdateEvent: A2A webhook for intermediate statuses (working, input-required, submitted) task_type: Task type from application routing (e.g., "get_products"). Applications should extract this from URL routing pattern: /webhook/{task_type}/{agent_id}/{operation_id} operation_id: Operation identifier from application routing. Used to correlate webhook notifications with original task submission. signature: Optional HMAC-SHA256 signature for MCP webhook verification (X-AdCP-Signature header). Ignored for A2A webhooks. timestamp: Optional Unix timestamp (seconds) for MCP webhook signature verification (X-AdCP-Timestamp header). Required when signature is provided. raw_body: Optional raw HTTP request body bytes for signature verification. When provided, used directly instead of re-serializing the payload, avoiding cross-language JSON serialization mismatches. Strongly recommended for production use. Returns: TaskResult with parsed task-specific response data. The structure is identical regardless of protocol. Raises: ADCPWebhookSignatureError: If MCP signature verification fails ValidationError: If MCP payload doesn't match WebhookPayload schema Note: task_type and operation_id were deprecated from the webhook payload per AdCP specification. Applications must extract these from URL routing and pass them explicitly. Examples: MCP webhook (HTTP endpoint): >>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}") >>> async def webhook_handler(task_type: str, operation_id: str, request: Request): >>> raw_body = await request.body() >>> payload = json.loads(raw_body) >>> signature = request.headers.get("X-AdCP-Signature") >>> timestamp = request.headers.get("X-AdCP-Timestamp") >>> result = await client.handle_webhook( >>> payload, task_type, operation_id, signature, timestamp, >>> raw_body=raw_body, >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}") A2A webhook with Task (terminated status): >>> async def on_task_completed(task: Task): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(task.id) >>> operation_id = your_task_registry.get_operation_id(task.id) >>> result = await client.handle_webhook( >>> task, task_type, operation_id >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}") A2A webhook with TaskStatusUpdateEvent (intermediate status): >>> async def on_task_update(event: TaskStatusUpdateEvent): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(event.task_id) >>> operation_id = your_task_registry.get_operation_id(event.task_id) >>> result = await client.handle_webhook( >>> event, task_type, operation_id >>> ) >>> if result.status == GeneratedTaskStatus.working: >>> print(f"Task still working: {result.metadata.get('message')}") """ # Detect protocol type and route to appropriate handler if isinstance(payload, (Task, TaskStatusUpdateEvent)): # A2A webhook (Task or TaskStatusUpdateEvent) return await self._handle_a2a_webhook(payload, task_type, operation_id) else: # MCP webhook (dict payload) return await self._handle_mcp_webhook( payload, task_type, operation_id, signature, timestamp, raw_body )Handle incoming webhook and return typed result.
This method provides a unified interface for handling webhooks from both MCP and A2A protocols:
- MCP Webhooks: HTTP POST with dict payload, optional HMAC signature
- A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status
The method automatically detects the protocol type and routes to the appropriate handler. Both protocols return a consistent TaskResult structure with typed AdCP response data.
Args
payload- Webhook payload - one of: - dict[str, Any]: MCP webhook payload from HTTP POST - Task: A2A webhook for terminated statuses (completed, failed) - TaskStatusUpdateEvent: A2A webhook for intermediate statuses (working, input-required, submitted)
task_type- Task type from application routing (e.g., "get_products"). Applications should extract this from URL routing pattern: /webhook/{task_type}/{agent_id}/{operation_id}
operation_id- Operation identifier from application routing. Used to correlate webhook notifications with original task submission.
signature- Optional HMAC-SHA256 signature for MCP webhook verification (X-AdCP-Signature header). Ignored for A2A webhooks.
timestamp- Optional Unix timestamp (seconds) for MCP webhook signature verification (X-AdCP-Timestamp header). Required when signature is provided.
raw_body- Optional raw HTTP request body bytes for signature verification. When provided, used directly instead of re-serializing the payload, avoiding cross-language JSON serialization mismatches. Strongly recommended for production use.
Returns
TaskResult with parsed task-specific response data. The structure is identical regardless of protocol.
Raises
ADCPWebhookSignatureError- If MCP signature verification fails
ValidationError- If MCP payload doesn't match WebhookPayload schema
Note
task_type and operation_id were deprecated from the webhook payload per AdCP specification. Applications must extract these from URL routing and pass them explicitly.
Examples
MCP webhook (HTTP endpoint):
>>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}") >>> async def webhook_handler(task_type: str, operation_id: str, request: Request): >>> raw_body = await request.body() >>> payload = json.loads(raw_body) >>> signature = request.headers.get("X-AdCP-Signature") >>> timestamp = request.headers.get("X-AdCP-Timestamp") >>> result = await client.handle_webhook( >>> payload, task_type, operation_id, signature, timestamp, >>> raw_body=raw_body, >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}")A2A webhook with Task (terminated status):
>>> async def on_task_completed(task: Task): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(task.id) >>> operation_id = your_task_registry.get_operation_id(task.id) >>> result = await client.handle_webhook( >>> task, task_type, operation_id >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}")A2A webhook with TaskStatusUpdateEvent (intermediate status):
>>> async def on_task_update(event: TaskStatusUpdateEvent): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(event.task_id) >>> operation_id = your_task_registry.get_operation_id(event.task_id) >>> result = await client.handle_webhook( >>> event, task_type, operation_id >>> ) >>> if result.status == GeneratedTaskStatus.working: >>> print(f"Task still working: {result.metadata.get('message')}") async def list_accounts(self, request: ListAccountsRequest) ‑> TaskResult[ListAccountsResponse]-
Expand source code
async def list_accounts( self, request: ListAccountsRequest, ) -> TaskResult[ListAccountsResponse]: """ List Accounts. Args: request: Request parameters Returns: TaskResult containing ListAccountsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_accounts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_accounts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_accounts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListAccountsResponse)List Accounts.
Args
request- Request parameters
Returns
TaskResult containing ListAccountsResponse
async def list_content_standards(self, request: ListContentStandardsRequest) ‑> TaskResult[Union[ListContentStandardsResponse1, ListContentStandardsResponse2]]-
Expand source code
async def list_content_standards( self, request: ListContentStandardsRequest, ) -> TaskResult[ListContentStandardsResponse]: """ List content standards configurations. Args: request: Request parameters including optional filters Returns: TaskResult containing ListContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListContentStandardsResponse)List content standards configurations.
Args
request- Request parameters including optional filters
Returns
TaskResult containing ListContentStandardsResponse
async def list_creative_formats(self,
request: ListCreativeFormatsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url') ‑> TaskResult[ListCreativeFormatsResponse]-
Expand source code
async def list_creative_formats( self, request: ListCreativeFormatsRequest, fetch_previews: bool = False, preview_output_format: str = "url", ) -> TaskResult[ListCreativeFormatsResponse]: """ List supported creative formats. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) Returns: TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creative_formats(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response( raw_result, ListCreativeFormatsResponse ) if fetch_previews and result.success and result.data: from adcp.utils.preview_cache import add_preview_urls_to_formats formats_with_previews = await add_preview_urls_to_formats( result.data.formats, self, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["formats_with_previews"] = formats_with_previews return resultList supported creative formats.
Args
request- Request parameters
fetch_previews- If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement)
preview_output_format- "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)
Returns
TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata
async def list_creatives(self, request: ListCreativesRequest) ‑> TaskResult[ListCreativesResponse]-
Expand source code
async def list_creatives( self, request: ListCreativesRequest, ) -> TaskResult[ListCreativesResponse]: """ List Creatives. Args: request: Request parameters Returns: TaskResult containing ListCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListCreativesResponse)List Creatives.
Args
request- Request parameters
Returns
TaskResult containing ListCreativesResponse
async def list_property_lists(self, request: ListPropertyListsRequest) ‑> TaskResult[ListPropertyListsResponse]-
Expand source code
async def list_property_lists( self, request: ListPropertyListsRequest, ) -> TaskResult[ListPropertyListsResponse]: """ List property lists owned by a principal. Retrieves metadata for all property lists, optionally filtered by principal or pagination parameters. Args: request: Request parameters with optional filtering Returns: TaskResult containing ListPropertyListsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_property_lists", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_property_lists(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_property_lists", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListPropertyListsResponse)List property lists owned by a principal.
Retrieves metadata for all property lists, optionally filtered by principal or pagination parameters.
Args
request- Request parameters with optional filtering
Returns
TaskResult containing ListPropertyListsResponse
async def list_tools(self) ‑> list[str]-
Expand source code
async def list_tools(self) -> list[str]: """ List available tools from the agent. Returns: List of tool names """ return await self.adapter.list_tools()List available tools from the agent.
Returns
List of tool names
async def log_event(self, request: LogEventRequest) ‑> TaskResult[Union[LogEventResponse1, LogEventResponse2]]-
Expand source code
async def log_event( self, request: LogEventRequest, ) -> TaskResult[LogEventResponse]: """ Log Event. Args: request: Request parameters Returns: TaskResult containing LogEventResponse """ self._validate_task_features("log_event") operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="log_event", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.log_event(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="log_event", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, LogEventResponse)Log Event.
Args
request- Request parameters
Returns
TaskResult containing LogEventResponse
async def preview_creative(self, request: PreviewCreativeRequest) ‑> TaskResult[Union[PreviewCreativeResponse1, PreviewCreativeResponse2, PreviewCreativeResponse3]]-
Expand source code
async def preview_creative( self, request: PreviewCreativeRequest, ) -> TaskResult[PreviewCreativeResponse]: """ Generate preview of a creative manifest. Args: request: Request parameters Returns: TaskResult containing PreviewCreativeResponse with preview URLs """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.preview_creative(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, PreviewCreativeResponse)Generate preview of a creative manifest.
Args
request- Request parameters
Returns
TaskResult containing PreviewCreativeResponse with preview URLs
async def provide_performance_feedback(self, request: ProvidePerformanceFeedbackRequest) ‑> TaskResult[Union[ProvidePerformanceFeedbackResponse1, ProvidePerformanceFeedbackResponse2]]-
Expand source code
async def provide_performance_feedback( self, request: ProvidePerformanceFeedbackRequest, ) -> TaskResult[ProvidePerformanceFeedbackResponse]: """ Provide Performance Feedback. Args: request: Request parameters Returns: TaskResult containing ProvidePerformanceFeedbackResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.provide_performance_feedback(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse)Provide Performance Feedback.
Args
request- Request parameters
Returns
TaskResult containing ProvidePerformanceFeedbackResponse
async def refresh_capabilities(self) ‑> GetAdcpCapabilitiesResponse-
Expand source code
async def refresh_capabilities(self) -> GetAdcpCapabilitiesResponse: """Fetch capabilities from the seller, bypassing cache. Returns: The seller's capabilities response. """ result = await self.get_adcp_capabilities(GetAdcpCapabilitiesRequest()) if result.success and result.data is not None: self._capabilities = result.data self._feature_resolver = FeatureResolver(result.data) self._capabilities_fetched_at = time.monotonic() return self._capabilities raise ADCPError( f"Failed to fetch capabilities: {result.error or result.message}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, )Fetch capabilities from the seller, bypassing cache.
Returns
The seller's capabilities response.
async def report_plan_outcome(self, request: ReportPlanOutcomeRequest) ‑> TaskResult[ReportPlanOutcomeResponse]-
Expand source code
async def report_plan_outcome( self, request: ReportPlanOutcomeRequest, ) -> TaskResult[ReportPlanOutcomeResponse]: """Report the outcome of a governed action to the governance agent.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_plan_outcome", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.report_plan_outcome(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_plan_outcome", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ReportPlanOutcomeResponse)Report the outcome of a governed action to the governance agent.
async def report_usage(self, request: ReportUsageRequest) ‑> TaskResult[ReportUsageResponse]-
Expand source code
async def report_usage( self, request: ReportUsageRequest, ) -> TaskResult[ReportUsageResponse]: """ Report Usage. Args: request: Request parameters Returns: TaskResult containing ReportUsageResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_usage", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.report_usage(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_usage", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ReportUsageResponse)Report Usage.
Args
request- Request parameters
Returns
TaskResult containing ReportUsageResponse
def require(self, *features: str) ‑> None-
Expand source code
def require(self, *features: str) -> None: """Assert that the seller supports all listed features. Args: *features: Feature identifiers to require. Raises: ADCPFeatureUnsupportedError: If any features are not supported. ADCPError: If capabilities have not been fetched yet. """ self._ensure_resolver().require( *features, agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, )Assert that the seller supports all listed features.
Args
*features- Feature identifiers to require.
Raises
ADCPFeatureUnsupportedError- If any features are not supported.
ADCPError- If capabilities have not been fetched yet.
async def si_get_offering(self, request: SiGetOfferingRequest) ‑> TaskResult[SiGetOfferingResponse]-
Expand source code
async def si_get_offering( self, request: SiGetOfferingRequest, ) -> TaskResult[SiGetOfferingResponse]: """ Get sponsored intelligence offering. Retrieves product/service offerings that can be presented in a sponsored intelligence session. Args: request: Request parameters including brand context Returns: TaskResult containing SiGetOfferingResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_get_offering", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_get_offering(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_get_offering", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiGetOfferingResponse)Get sponsored intelligence offering.
Retrieves product/service offerings that can be presented in a sponsored intelligence session.
Args
request- Request parameters including brand context
Returns
TaskResult containing SiGetOfferingResponse
async def si_initiate_session(self, request: SiInitiateSessionRequest) ‑> TaskResult[SiInitiateSessionResponse]-
Expand source code
async def si_initiate_session( self, request: SiInitiateSessionRequest, ) -> TaskResult[SiInitiateSessionResponse]: """ Initiate a sponsored intelligence session. Starts a conversational brand experience session with a user. Args: request: Request parameters including identity and context Returns: TaskResult containing SiInitiateSessionResponse with session_id """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_initiate_session", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_initiate_session(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_initiate_session", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiInitiateSessionResponse)Initiate a sponsored intelligence session.
Starts a conversational brand experience session with a user.
Args
request- Request parameters including identity and context
Returns
TaskResult containing SiInitiateSessionResponse with session_id
async def si_send_message(self, request: SiSendMessageRequest) ‑> TaskResult[SiSendMessageResponse]-
Expand source code
async def si_send_message( self, request: SiSendMessageRequest, ) -> TaskResult[SiSendMessageResponse]: """ Send a message in a sponsored intelligence session. Continues the conversation in an active SI session. Args: request: Request parameters including session_id and message Returns: TaskResult containing SiSendMessageResponse with brand response """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_send_message", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_send_message(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_send_message", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiSendMessageResponse)Send a message in a sponsored intelligence session.
Continues the conversation in an active SI session.
Args
request- Request parameters including session_id and message
Returns
TaskResult containing SiSendMessageResponse with brand response
async def si_terminate_session(self, request: SiTerminateSessionRequest) ‑> TaskResult[SiTerminateSessionResponse]-
Expand source code
async def si_terminate_session( self, request: SiTerminateSessionRequest, ) -> TaskResult[SiTerminateSessionResponse]: """ Terminate a sponsored intelligence session. Ends an active SI session, optionally with follow-up actions. Args: request: Request parameters including session_id and termination context Returns: TaskResult containing SiTerminateSessionResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_terminate_session", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_terminate_session(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_terminate_session", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiTerminateSessionResponse)Terminate a sponsored intelligence session.
Ends an active SI session, optionally with follow-up actions.
Args
request- Request parameters including session_id and termination context
Returns
TaskResult containing SiTerminateSessionResponse
def supports(self, feature: str) ‑> bool-
Expand source code
def supports(self, feature: str) -> bool: """Check if the seller supports a feature. Supports multiple feature namespaces: - Protocol support: ``supports("media_buy")`` checks ``supported_protocols`` - Extension support: ``supports("ext:scope3")`` checks ``extensions_supported`` - Targeting: ``supports("targeting.geo_countries")`` checks ``media_buy.execution.targeting`` - Media buy features: ``supports("audience_targeting")`` checks ``media_buy.features`` - Signals features: ``supports("catalog_signals")`` checks ``signals.features`` Args: feature: Feature identifier to check. Returns: True if the seller declares the feature as supported. Raises: ADCPError: If capabilities have not been fetched yet. """ return self._ensure_resolver().supports(feature)Check if the seller supports a feature.
Supports multiple feature namespaces: - Protocol support:
supports("media_buy")checkssupported_protocols- Extension support:supports("ext:scope3")checksextensions_supported- Targeting:supports("targeting.geo_countries")checksmedia_buy.execution.targeting- Media buy features:supports("audience_targeting")checksmedia_buy.features- Signals features:supports("catalog_signals")checkssignals.featuresArgs
feature- Feature identifier to check.
Returns
True if the seller declares the feature as supported.
Raises
ADCPError- If capabilities have not been fetched yet.
async def sync_accounts(self, request: SyncAccountsRequest) ‑> TaskResult[Union[SyncAccountsResponse1, SyncAccountsResponse2]]-
Expand source code
async def sync_accounts( self, request: SyncAccountsRequest, ) -> TaskResult[SyncAccountsResponse]: """ Sync Accounts. Args: request: Request parameters Returns: TaskResult containing SyncAccountsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_accounts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_accounts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_accounts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncAccountsResponse)Sync Accounts.
Args
request- Request parameters
Returns
TaskResult containing SyncAccountsResponse
async def sync_audiences(self, request: SyncAudiencesRequest) ‑> TaskResult[Union[SyncAudiencesResponse1, SyncAudiencesResponse2]]-
Expand source code
async def sync_audiences( self, request: SyncAudiencesRequest, ) -> TaskResult[SyncAudiencesResponse]: """ Sync Audiences. Args: request: Request parameters Returns: TaskResult containing SyncAudiencesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_audiences", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_audiences(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_audiences", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncAudiencesResponse)Sync Audiences.
Args
request- Request parameters
Returns
TaskResult containing SyncAudiencesResponse
async def sync_catalogs(self, request: SyncCatalogsRequest) ‑> TaskResult[Union[SyncCatalogsResponse1, SyncCatalogsResponse2]]-
Expand source code
async def sync_catalogs( self, request: SyncCatalogsRequest, ) -> TaskResult[SyncCatalogsResponse]: """ Sync Catalogs. Args: request: Request parameters Returns: TaskResult containing SyncCatalogsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_catalogs", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_catalogs(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_catalogs", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCatalogsResponse)Sync Catalogs.
Args
request- Request parameters
Returns
TaskResult containing SyncCatalogsResponse
async def sync_creatives(self, request: SyncCreativesRequest) ‑> TaskResult[Union[SyncCreativesResponse1, SyncCreativesResponse2]]-
Expand source code
async def sync_creatives( self, request: SyncCreativesRequest, ) -> TaskResult[SyncCreativesResponse]: """ Sync Creatives. Args: request: Request parameters Returns: TaskResult containing SyncCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCreativesResponse)Sync Creatives.
Args
request- Request parameters
Returns
TaskResult containing SyncCreativesResponse
async def sync_event_sources(self, request: SyncEventSourcesRequest) ‑> TaskResult[Union[SyncEventSourcesResponse1, SyncEventSourcesResponse2]]-
Expand source code
async def sync_event_sources( self, request: SyncEventSourcesRequest, ) -> TaskResult[SyncEventSourcesResponse]: """ Sync Event Sources. Args: request: Request parameters Returns: TaskResult containing SyncEventSourcesResponse """ self._validate_task_features("sync_event_sources") operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_event_sources", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_event_sources(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_event_sources", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncEventSourcesResponse)Sync Event Sources.
Args
request- Request parameters
Returns
TaskResult containing SyncEventSourcesResponse
async def sync_plans(self, request: SyncPlansRequest) ‑> TaskResult[SyncPlansResponse]-
Expand source code
async def sync_plans( self, request: SyncPlansRequest, ) -> TaskResult[SyncPlansResponse]: """Sync campaign governance plans to the governance agent.""" operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_plans", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_plans(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_plans", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncPlansResponse)Sync campaign governance plans to the governance agent.
async def update_content_standards(self, request: UpdateContentStandardsRequest) ‑> TaskResult[Union[UpdateContentStandardsResponse1, UpdateContentStandardsResponse2]]-
Expand source code
async def update_content_standards( self, request: UpdateContentStandardsRequest, ) -> TaskResult[UpdateContentStandardsResponse]: """ Update a content standards configuration. Args: request: Request parameters including standards_id and updates Returns: TaskResult containing UpdateContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateContentStandardsResponse)Update a content standards configuration.
Args
request- Request parameters including standards_id and updates
Returns
TaskResult containing UpdateContentStandardsResponse
async def update_media_buy(self, request: UpdateMediaBuyRequest) ‑> TaskResult[Union[UpdateMediaBuyResponse1, UpdateMediaBuyResponse2]]-
Expand source code
async def update_media_buy( self, request: UpdateMediaBuyRequest, ) -> TaskResult[UpdateMediaBuyResponse]: """ Update an existing media buy reservation. Modifies a previously created media buy by updating packages or publisher properties. The update operation uses discriminated unions to specify what to change - either package details or targeting properties. Args: request: Media buy update parameters including: - media_buy_id: Identifier from create_media_buy response - updates: Discriminated union specifying update type: * UpdateMediaBuyPackagesRequest: Modify package selections * UpdateMediaBuyPropertiesRequest: Change targeting properties Returns: TaskResult containing UpdateMediaBuyResponse with: - media_buy_id: The updated media buy identifier - status: Updated state of the media buy - packages: Updated package configurations - Additional platform-specific metadata Example: >>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest >>> client = ADCPClient(agent_config) >>> request = UpdateMediaBuyPackagesRequest( ... media_buy_id="mb_123", ... packages=[updated_package] ... ) >>> result = await client.update_media_buy(request) >>> if result.success: ... updated_packages = result.data.packages """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_media_buy", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_media_buy(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_media_buy", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateMediaBuyResponse)Update an existing media buy reservation.
Modifies a previously created media buy by updating packages or publisher properties. The update operation uses discriminated unions to specify what to change - either package details or targeting properties.
Args
request- Media buy update parameters including: - media_buy_id: Identifier from create_media_buy response - updates: Discriminated union specifying update type: * UpdateMediaBuyPackagesRequest: Modify package selections * UpdateMediaBuyPropertiesRequest: Change targeting properties
Returns
TaskResult containing UpdateMediaBuyResponse with: - media_buy_id: The updated media buy identifier - status: Updated state of the media buy - packages: Updated package configurations - Additional platform-specific metadata
Example
>>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest >>> client = ADCPClient(agent_config) >>> request = UpdateMediaBuyPackagesRequest( ... media_buy_id="mb_123", ... packages=[updated_package] ... ) >>> result = await client.update_media_buy(request) >>> if result.success: ... updated_packages = result.data.packages async def update_property_list(self, request: UpdatePropertyListRequest) ‑> TaskResult[UpdatePropertyListResponse]-
Expand source code
async def update_property_list( self, request: UpdatePropertyListRequest, ) -> TaskResult[UpdatePropertyListResponse]: """ Update a property list. Modifies the filters, brand manifest, or other parameters of an existing property list. Args: request: Request parameters with list_id and updates Returns: TaskResult containing UpdatePropertyListResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdatePropertyListResponse)Update a property list.
Modifies the filters, brand manifest, or other parameters of an existing property list.
Args
request- Request parameters with list_id and updates
Returns
TaskResult containing UpdatePropertyListResponse
async def validate_content_delivery(self, request: ValidateContentDeliveryRequest) ‑> TaskResult[Union[ValidateContentDeliveryResponse1, ValidateContentDeliveryResponse2]]-
Expand source code
async def validate_content_delivery( self, request: ValidateContentDeliveryRequest, ) -> TaskResult[ValidateContentDeliveryResponse]: """ Validate content delivery against standards. Validates that ad delivery records comply with content standards. Args: request: Request parameters including delivery records Returns: TaskResult containing ValidateContentDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="validate_content_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.validate_content_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="validate_content_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ValidateContentDeliveryResponse)Validate content delivery against standards.
Validates that ad delivery records comply with content standards.
Args
request- Request parameters including delivery records
Returns
TaskResult containing ValidateContentDeliveryResponse
class ADCPMultiAgentClient (agents: list[AgentConfig],
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
handlers: dict[str, Callable[..., Any]] | None = None)-
Expand source code
class ADCPMultiAgentClient: """Client for managing multiple AdCP agents.""" def __init__( self, agents: list[AgentConfig], webhook_url_template: str | None = None, webhook_secret: str | None = None, on_activity: Callable[[Activity], None] | None = None, handlers: dict[str, Callable[..., Any]] | None = None, ): """ Initialize multi-agent client. Args: agents: List of agent configurations webhook_url_template: Template for webhook URLs webhook_secret: Secret for webhook verification on_activity: Callback for activity events handlers: Task completion handlers """ self.agents = { agent.id: ADCPClient( agent, webhook_url_template=webhook_url_template, webhook_secret=webhook_secret, on_activity=on_activity, ) for agent in agents } self.handlers = handlers or {} def agent(self, agent_id: str) -> ADCPClient: """Get client for specific agent.""" if agent_id not in self.agents: raise ValueError(f"Agent not found: {agent_id}") return self.agents[agent_id] @property def agent_ids(self) -> list[str]: """Get list of agent IDs.""" return list(self.agents.keys()) async def close(self) -> None: """Close all agent clients and clean up resources.""" import asyncio logger.debug("Closing all agent clients in multi-agent client") close_tasks = [client.close() for client in self.agents.values()] await asyncio.gather(*close_tasks, return_exceptions=True) async def __aenter__(self) -> ADCPMultiAgentClient: """Async context manager entry.""" return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Async context manager exit.""" await self.close() async def get_products( self, request: GetProductsRequest, ) -> list[TaskResult[GetProductsResponse]]: """ Execute get_products across all agents in parallel. Args: request: Request parameters Returns: List of TaskResults containing GetProductsResponse for each agent """ import asyncio tasks = [agent.get_products(request) for agent in self.agents.values()] return await asyncio.gather(*tasks) @classmethod def from_env(cls) -> ADCPMultiAgentClient: """Create client from environment variables.""" agents_json = os.getenv("ADCP_AGENTS") if not agents_json: raise ValueError("ADCP_AGENTS environment variable not set") agents_data = json.loads(agents_json) agents = [AgentConfig(**agent) for agent in agents_data] return cls( agents=agents, webhook_url_template=os.getenv("WEBHOOK_URL_TEMPLATE"), webhook_secret=os.getenv("WEBHOOK_SECRET"), )Client for managing multiple AdCP agents.
Initialize multi-agent client.
Args
agents- List of agent configurations
webhook_url_template- Template for webhook URLs
webhook_secret- Secret for webhook verification
on_activity- Callback for activity events
handlers- Task completion handlers
Static methods
def from_env() ‑> ADCPMultiAgentClient-
Create client from environment variables.
Instance variables
prop agent_ids : list[str]-
Expand source code
@property def agent_ids(self) -> list[str]: """Get list of agent IDs.""" return list(self.agents.keys())Get list of agent IDs.
Methods
def agent(self, agent_id: str) ‑> ADCPClient-
Expand source code
def agent(self, agent_id: str) -> ADCPClient: """Get client for specific agent.""" if agent_id not in self.agents: raise ValueError(f"Agent not found: {agent_id}") return self.agents[agent_id]Get client for specific agent.
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close all agent clients and clean up resources.""" import asyncio logger.debug("Closing all agent clients in multi-agent client") close_tasks = [client.close() for client in self.agents.values()] await asyncio.gather(*close_tasks, return_exceptions=True)Close all agent clients and clean up resources.
async def get_products(self, request: GetProductsRequest) ‑> list[TaskResult[GetProductsResponse]]-
Expand source code
async def get_products( self, request: GetProductsRequest, ) -> list[TaskResult[GetProductsResponse]]: """ Execute get_products across all agents in parallel. Args: request: Request parameters Returns: List of TaskResults containing GetProductsResponse for each agent """ import asyncio tasks = [agent.get_products(request) for agent in self.agents.values()] return await asyncio.gather(*tasks)Execute get_products across all agents in parallel.
Args
request- Request parameters
Returns
List of TaskResults containing GetProductsResponse for each agent