Module adcp.client
Classes
class ADCPClient (agent_config: AgentConfig,
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None)-
Expand source code
class ADCPClient: """Client for interacting with a single AdCP agent.""" def __init__( self, agent_config: AgentConfig, webhook_url_template: str | None = None, webhook_secret: str | None = None, on_activity: Callable[[Activity], None] | None = None, ): """ Initialize ADCP client for a single agent. Args: agent_config: Agent configuration webhook_url_template: Template for webhook URLs with {agent_id}, {task_type}, {operation_id} webhook_secret: Secret for webhook signature verification on_activity: Callback for activity events """ self.agent_config = agent_config self.webhook_url_template = webhook_url_template self.webhook_secret = webhook_secret self.on_activity = on_activity # Initialize protocol adapter self.adapter: ProtocolAdapter if agent_config.protocol == Protocol.A2A: self.adapter = A2AAdapter(agent_config) elif agent_config.protocol == Protocol.MCP: self.adapter = MCPAdapter(agent_config) else: raise ValueError(f"Unsupported protocol: {agent_config.protocol}") # Initialize simple API accessor (lazy import to avoid circular dependency) from adcp.simple import SimpleAPI self.simple = SimpleAPI(self) def get_webhook_url(self, task_type: str, operation_id: str) -> str: """Generate webhook URL for a task.""" if not self.webhook_url_template: raise ValueError("webhook_url_template not configured") return self.webhook_url_template.format( agent_id=self.agent_config.id, task_type=task_type, operation_id=operation_id, ) def _emit_activity(self, activity: Activity) -> None: """Emit activity event.""" if self.on_activity: self.on_activity(activity) async def get_products( self, request: GetProductsRequest, fetch_previews: bool = False, preview_output_format: str = "url", creative_agent_client: ADCPClient | None = None, ) -> TaskResult[GetProductsResponse]: """ Get advertising products. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) creative_agent_client: Client for creative agent (required if fetch_previews=True) Returns: TaskResult containing GetProductsResponse with optional preview URLs in metadata Raises: ValueError: If fetch_previews=True but creative_agent_client is not provided """ if fetch_previews and not creative_agent_client: raise ValueError("creative_agent_client is required when fetch_previews=True") operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_products(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[GetProductsResponse] = self.adapter._parse_response( raw_result, GetProductsResponse ) if fetch_previews and result.success and result.data and creative_agent_client: from adcp.utils.preview_cache import add_preview_urls_to_products products_with_previews = await add_preview_urls_to_products( result.data.products, creative_agent_client, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["products_with_previews"] = products_with_previews return result async def list_creative_formats( self, request: ListCreativeFormatsRequest, fetch_previews: bool = False, preview_output_format: str = "url", ) -> TaskResult[ListCreativeFormatsResponse]: """ List supported creative formats. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) Returns: TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creative_formats(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response( raw_result, ListCreativeFormatsResponse ) if fetch_previews and result.success and result.data: from adcp.utils.preview_cache import add_preview_urls_to_formats formats_with_previews = await add_preview_urls_to_formats( result.data.formats, self, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["formats_with_previews"] = formats_with_previews return result async def preview_creative( self, request: PreviewCreativeRequest, ) -> TaskResult[PreviewCreativeResponse]: """ Generate preview of a creative manifest. Args: request: Request parameters Returns: TaskResult containing PreviewCreativeResponse with preview URLs """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.preview_creative(params) # type: ignore[attr-defined] self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, PreviewCreativeResponse) async def sync_creatives( self, request: SyncCreativesRequest, ) -> TaskResult[SyncCreativesResponse]: """ Sync Creatives. Args: request: Request parameters Returns: TaskResult containing SyncCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCreativesResponse) async def list_creatives( self, request: ListCreativesRequest, ) -> TaskResult[ListCreativesResponse]: """ List Creatives. Args: request: Request parameters Returns: TaskResult containing ListCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListCreativesResponse) async def get_media_buy_delivery( self, request: GetMediaBuyDeliveryRequest, ) -> TaskResult[GetMediaBuyDeliveryResponse]: """ Get Media Buy Delivery. Args: request: Request parameters Returns: TaskResult containing GetMediaBuyDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse) async def list_authorized_properties( self, request: ListAuthorizedPropertiesRequest, ) -> TaskResult[ListAuthorizedPropertiesResponse]: """ List Authorized Properties. Args: request: Request parameters Returns: TaskResult containing ListAuthorizedPropertiesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_authorized_properties", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_authorized_properties(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_authorized_properties", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListAuthorizedPropertiesResponse) async def get_signals( self, request: GetSignalsRequest, ) -> TaskResult[GetSignalsResponse]: """ Get Signals. Args: request: Request parameters Returns: TaskResult containing GetSignalsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_signals(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetSignalsResponse) async def activate_signal( self, request: ActivateSignalRequest, ) -> TaskResult[ActivateSignalResponse]: """ Activate Signal. Args: request: Request parameters Returns: TaskResult containing ActivateSignalResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.activate_signal(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ActivateSignalResponse) async def provide_performance_feedback( self, request: ProvidePerformanceFeedbackRequest, ) -> TaskResult[ProvidePerformanceFeedbackResponse]: """ Provide Performance Feedback. Args: request: Request parameters Returns: TaskResult containing ProvidePerformanceFeedbackResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.provide_performance_feedback(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse) async def list_tools(self) -> list[str]: """ List available tools from the agent. Returns: List of tool names """ return await self.adapter.list_tools() async def close(self) -> None: """Close the adapter and clean up resources.""" if hasattr(self.adapter, "close"): logger.debug(f"Closing adapter for agent {self.agent_config.id}") await self.adapter.close() async def __aenter__(self) -> ADCPClient: """Async context manager entry.""" return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Async context manager exit.""" await self.close() def _verify_webhook_signature(self, payload: dict[str, Any], signature: str) -> bool: """ Verify HMAC-SHA256 signature of webhook payload. Args: payload: Webhook payload dict signature: Signature to verify Returns: True if signature is valid, False otherwise """ if not self.webhook_secret: return True payload_bytes = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8") expected_signature = hmac.new( self.webhook_secret.encode("utf-8"), payload_bytes, hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected_signature) def _parse_webhook_result(self, webhook: WebhookPayload) -> TaskResult[Any]: """ Parse webhook payload into typed TaskResult based on task_type. Args: webhook: Validated webhook payload Returns: TaskResult with task-specific typed response data """ from adcp.utils.response_parser import parse_json_or_text # Map task types to their response types (using string literals, not enum) # Note: Some response types are Union types (e.g., ActivateSignalResponse = Success | Error) response_type_map: dict[str, type[BaseModel] | Any] = { "get_products": GetProductsResponse, "list_creative_formats": ListCreativeFormatsResponse, "sync_creatives": SyncCreativesResponse, # Union type "list_creatives": ListCreativesResponse, "get_media_buy_delivery": GetMediaBuyDeliveryResponse, "list_authorized_properties": ListAuthorizedPropertiesResponse, "get_signals": GetSignalsResponse, "activate_signal": ActivateSignalResponse, # Union type "provide_performance_feedback": ProvidePerformanceFeedbackResponse, } # Handle completed tasks with result parsing if webhook.status == GeneratedTaskStatus.completed and webhook.result is not None: response_type = response_type_map.get(webhook.task_type.value) if response_type: try: parsed_result: Any = parse_json_or_text(webhook.result, response_type) return TaskResult[Any]( status=TaskStatus.COMPLETED, data=parsed_result, success=True, metadata={ "task_id": webhook.task_id, "operation_id": webhook.operation_id, "timestamp": webhook.timestamp, "message": webhook.message, }, ) except ValueError as e: logger.warning(f"Failed to parse webhook result: {e}") # Fall through to untyped result # Handle failed, input-required, or unparseable results # Convert webhook status to core TaskStatus enum # Map generated enum values to core enum values status_map = { GeneratedTaskStatus.completed: TaskStatus.COMPLETED, GeneratedTaskStatus.submitted: TaskStatus.SUBMITTED, GeneratedTaskStatus.working: TaskStatus.WORKING, GeneratedTaskStatus.failed: TaskStatus.FAILED, GeneratedTaskStatus.input_required: TaskStatus.NEEDS_INPUT, } task_status = status_map.get(webhook.status, TaskStatus.FAILED) return TaskResult[Any]( status=task_status, data=webhook.result, success=webhook.status == GeneratedTaskStatus.completed, error=webhook.error if isinstance(webhook.error, str) else None, metadata={ "task_id": webhook.task_id, "operation_id": webhook.operation_id, "timestamp": webhook.timestamp, "message": webhook.message, "context_id": webhook.context_id, "progress": webhook.progress, }, ) async def handle_webhook( self, payload: dict[str, Any], signature: str | None = None, ) -> TaskResult[Any]: """ Handle incoming webhook and return typed result. This method: 1. Verifies webhook signature (if provided) 2. Validates payload against WebhookPayload schema 3. Parses task-specific result data into typed response 4. Emits activity for monitoring Args: payload: Webhook payload dict signature: Optional HMAC-SHA256 signature for verification Returns: TaskResult with parsed task-specific response data Raises: ADCPWebhookSignatureError: If signature verification fails ValidationError: If payload doesn't match WebhookPayload schema Example: >>> result = await client.handle_webhook(payload, signature) >>> if result.success and isinstance(result.data, GetProductsResponse): >>> print(f"Found {len(result.data.products)} products") """ # Verify signature before processing if signature and not self._verify_webhook_signature(payload, signature): logger.warning( f"Webhook signature verification failed for agent {self.agent_config.id}" ) raise ADCPWebhookSignatureError("Invalid webhook signature") # Validate and parse webhook payload webhook = WebhookPayload.model_validate(payload) # Emit activity for monitoring self._emit_activity( Activity( type=ActivityType.WEBHOOK_RECEIVED, operation_id=webhook.operation_id or "unknown", agent_id=self.agent_config.id, task_type=webhook.task_type.value, timestamp=datetime.now(timezone.utc).isoformat(), metadata={"payload": payload}, ) ) # Parse and return typed result return self._parse_webhook_result(webhook)Client for interacting with a single AdCP agent.
Initialize ADCP client for a single agent.
Args
agent_config- Agent configuration
webhook_url_template- Template for webhook URLs with {agent_id}, {task_type}, {operation_id}
webhook_secret- Secret for webhook signature verification
on_activity- Callback for activity events
Methods
async def activate_signal(self, request: ActivateSignalRequest) ‑> TaskResult[ActivateSignalResponse]-
Expand source code
async def activate_signal( self, request: ActivateSignalRequest, ) -> TaskResult[ActivateSignalResponse]: """ Activate Signal. Args: request: Request parameters Returns: TaskResult containing ActivateSignalResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.activate_signal(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ActivateSignalResponse)Activate Signal.
Args
request- Request parameters
Returns
TaskResult containing ActivateSignalResponse
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close the adapter and clean up resources.""" if hasattr(self.adapter, "close"): logger.debug(f"Closing adapter for agent {self.agent_config.id}") await self.adapter.close()Close the adapter and clean up resources.
async def get_media_buy_delivery(self, request: GetMediaBuyDeliveryRequest) ‑> TaskResult[GetMediaBuyDeliveryResponse]-
Expand source code
async def get_media_buy_delivery( self, request: GetMediaBuyDeliveryRequest, ) -> TaskResult[GetMediaBuyDeliveryResponse]: """ Get Media Buy Delivery. Args: request: Request parameters Returns: TaskResult containing GetMediaBuyDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse)Get Media Buy Delivery.
Args
request- Request parameters
Returns
TaskResult containing GetMediaBuyDeliveryResponse
async def get_products(self,
request: GetProductsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url',
creative_agent_client: ADCPClient | None = None) ‑> TaskResult[GetProductsResponse]-
Expand source code
async def get_products( self, request: GetProductsRequest, fetch_previews: bool = False, preview_output_format: str = "url", creative_agent_client: ADCPClient | None = None, ) -> TaskResult[GetProductsResponse]: """ Get advertising products. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) creative_agent_client: Client for creative agent (required if fetch_previews=True) Returns: TaskResult containing GetProductsResponse with optional preview URLs in metadata Raises: ValueError: If fetch_previews=True but creative_agent_client is not provided """ if fetch_previews and not creative_agent_client: raise ValueError("creative_agent_client is required when fetch_previews=True") operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_products(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[GetProductsResponse] = self.adapter._parse_response( raw_result, GetProductsResponse ) if fetch_previews and result.success and result.data and creative_agent_client: from adcp.utils.preview_cache import add_preview_urls_to_products products_with_previews = await add_preview_urls_to_products( result.data.products, creative_agent_client, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["products_with_previews"] = products_with_previews return resultGet advertising products.
Args
request- Request parameters
fetch_previews- If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement)
preview_output_format- "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)
creative_agent_client- Client for creative agent (required if fetch_previews=True)
Returns
TaskResult containing GetProductsResponse with optional preview URLs in metadata
Raises
ValueError- If fetch_previews=True but creative_agent_client is not provided
async def get_signals(self, request: GetSignalsRequest) ‑> TaskResult[GetSignalsResponse]-
Expand source code
async def get_signals( self, request: GetSignalsRequest, ) -> TaskResult[GetSignalsResponse]: """ Get Signals. Args: request: Request parameters Returns: TaskResult containing GetSignalsResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_signals(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetSignalsResponse)Get Signals.
Args
request- Request parameters
Returns
TaskResult containing GetSignalsResponse
def get_webhook_url(self, task_type: str, operation_id: str) ‑> str-
Expand source code
def get_webhook_url(self, task_type: str, operation_id: str) -> str: """Generate webhook URL for a task.""" if not self.webhook_url_template: raise ValueError("webhook_url_template not configured") return self.webhook_url_template.format( agent_id=self.agent_config.id, task_type=task_type, operation_id=operation_id, )Generate webhook URL for a task.
async def handle_webhook(self, payload: dict[str, Any], signature: str | None = None) ‑> TaskResult[Any]-
Expand source code
async def handle_webhook( self, payload: dict[str, Any], signature: str | None = None, ) -> TaskResult[Any]: """ Handle incoming webhook and return typed result. This method: 1. Verifies webhook signature (if provided) 2. Validates payload against WebhookPayload schema 3. Parses task-specific result data into typed response 4. Emits activity for monitoring Args: payload: Webhook payload dict signature: Optional HMAC-SHA256 signature for verification Returns: TaskResult with parsed task-specific response data Raises: ADCPWebhookSignatureError: If signature verification fails ValidationError: If payload doesn't match WebhookPayload schema Example: >>> result = await client.handle_webhook(payload, signature) >>> if result.success and isinstance(result.data, GetProductsResponse): >>> print(f"Found {len(result.data.products)} products") """ # Verify signature before processing if signature and not self._verify_webhook_signature(payload, signature): logger.warning( f"Webhook signature verification failed for agent {self.agent_config.id}" ) raise ADCPWebhookSignatureError("Invalid webhook signature") # Validate and parse webhook payload webhook = WebhookPayload.model_validate(payload) # Emit activity for monitoring self._emit_activity( Activity( type=ActivityType.WEBHOOK_RECEIVED, operation_id=webhook.operation_id or "unknown", agent_id=self.agent_config.id, task_type=webhook.task_type.value, timestamp=datetime.now(timezone.utc).isoformat(), metadata={"payload": payload}, ) ) # Parse and return typed result return self._parse_webhook_result(webhook)Handle incoming webhook and return typed result.
This method: 1. Verifies webhook signature (if provided) 2. Validates payload against WebhookPayload schema 3. Parses task-specific result data into typed response 4. Emits activity for monitoring
Args
payload- Webhook payload dict
signature- Optional HMAC-SHA256 signature for verification
Returns
TaskResult with parsed task-specific response data
Raises
ADCPWebhookSignatureError- If signature verification fails
ValidationError- If payload doesn't match WebhookPayload schema
Example
>>> result = await client.handle_webhook(payload, signature) >>> if result.success and isinstance(result.data, GetProductsResponse): >>> print(f"Found {len(result.data.products)} products") -
Expand source code
async def list_authorized_properties( self, request: ListAuthorizedPropertiesRequest, ) -> TaskResult[ListAuthorizedPropertiesResponse]: """ List Authorized Properties. Args: request: Request parameters Returns: TaskResult containing ListAuthorizedPropertiesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_authorized_properties", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_authorized_properties(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_authorized_properties", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListAuthorizedPropertiesResponse)List Authorized Properties.
Args
request- Request parameters
Returns
TaskResult containing ListAuthorizedPropertiesResponse
async def list_creative_formats(self,
request: ListCreativeFormatsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url') ‑> TaskResult[ListCreativeFormatsResponse]-
Expand source code
async def list_creative_formats( self, request: ListCreativeFormatsRequest, fetch_previews: bool = False, preview_output_format: str = "url", ) -> TaskResult[ListCreativeFormatsResponse]: """ List supported creative formats. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) Returns: TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creative_formats(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response( raw_result, ListCreativeFormatsResponse ) if fetch_previews and result.success and result.data: from adcp.utils.preview_cache import add_preview_urls_to_formats formats_with_previews = await add_preview_urls_to_formats( result.data.formats, self, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["formats_with_previews"] = formats_with_previews return resultList supported creative formats.
Args
request- Request parameters
fetch_previews- If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement)
preview_output_format- "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)
Returns
TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata
async def list_creatives(self, request: ListCreativesRequest) ‑> TaskResult[ListCreativesResponse]-
Expand source code
async def list_creatives( self, request: ListCreativesRequest, ) -> TaskResult[ListCreativesResponse]: """ List Creatives. Args: request: Request parameters Returns: TaskResult containing ListCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListCreativesResponse)List Creatives.
Args
request- Request parameters
Returns
TaskResult containing ListCreativesResponse
async def list_tools(self) ‑> list[str]-
Expand source code
async def list_tools(self) -> list[str]: """ List available tools from the agent. Returns: List of tool names """ return await self.adapter.list_tools()List available tools from the agent.
Returns
List of tool names
async def preview_creative(self, request: PreviewCreativeRequest) ‑> TaskResult[PreviewCreativeResponse]-
Expand source code
async def preview_creative( self, request: PreviewCreativeRequest, ) -> TaskResult[PreviewCreativeResponse]: """ Generate preview of a creative manifest. Args: request: Request parameters Returns: TaskResult containing PreviewCreativeResponse with preview URLs """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.preview_creative(params) # type: ignore[attr-defined] self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, PreviewCreativeResponse)Generate preview of a creative manifest.
Args
request- Request parameters
Returns
TaskResult containing PreviewCreativeResponse with preview URLs
async def provide_performance_feedback(self, request: ProvidePerformanceFeedbackRequest) ‑> TaskResult[ProvidePerformanceFeedbackResponse]-
Expand source code
async def provide_performance_feedback( self, request: ProvidePerformanceFeedbackRequest, ) -> TaskResult[ProvidePerformanceFeedbackResponse]: """ Provide Performance Feedback. Args: request: Request parameters Returns: TaskResult containing ProvidePerformanceFeedbackResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.provide_performance_feedback(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse)Provide Performance Feedback.
Args
request- Request parameters
Returns
TaskResult containing ProvidePerformanceFeedbackResponse
async def sync_creatives(self, request: SyncCreativesRequest) ‑> TaskResult[SyncCreativesResponse]-
Expand source code
async def sync_creatives( self, request: SyncCreativesRequest, ) -> TaskResult[SyncCreativesResponse]: """ Sync Creatives. Args: request: Request parameters Returns: TaskResult containing SyncCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCreativesResponse)Sync Creatives.
Args
request- Request parameters
Returns
TaskResult containing SyncCreativesResponse
class ADCPMultiAgentClient (agents: list[AgentConfig],
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
handlers: dict[str, Callable[..., Any]] | None = None)-
Expand source code
class ADCPMultiAgentClient: """Client for managing multiple AdCP agents.""" def __init__( self, agents: list[AgentConfig], webhook_url_template: str | None = None, webhook_secret: str | None = None, on_activity: Callable[[Activity], None] | None = None, handlers: dict[str, Callable[..., Any]] | None = None, ): """ Initialize multi-agent client. Args: agents: List of agent configurations webhook_url_template: Template for webhook URLs webhook_secret: Secret for webhook verification on_activity: Callback for activity events handlers: Task completion handlers """ self.agents = { agent.id: ADCPClient( agent, webhook_url_template=webhook_url_template, webhook_secret=webhook_secret, on_activity=on_activity, ) for agent in agents } self.handlers = handlers or {} def agent(self, agent_id: str) -> ADCPClient: """Get client for specific agent.""" if agent_id not in self.agents: raise ValueError(f"Agent not found: {agent_id}") return self.agents[agent_id] @property def agent_ids(self) -> list[str]: """Get list of agent IDs.""" return list(self.agents.keys()) async def close(self) -> None: """Close all agent clients and clean up resources.""" import asyncio logger.debug("Closing all agent clients in multi-agent client") close_tasks = [client.close() for client in self.agents.values()] await asyncio.gather(*close_tasks, return_exceptions=True) async def __aenter__(self) -> ADCPMultiAgentClient: """Async context manager entry.""" return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Async context manager exit.""" await self.close() async def get_products( self, request: GetProductsRequest, ) -> list[TaskResult[GetProductsResponse]]: """ Execute get_products across all agents in parallel. Args: request: Request parameters Returns: List of TaskResults containing GetProductsResponse for each agent """ import asyncio tasks = [agent.get_products(request) for agent in self.agents.values()] return await asyncio.gather(*tasks) @classmethod def from_env(cls) -> ADCPMultiAgentClient: """Create client from environment variables.""" agents_json = os.getenv("ADCP_AGENTS") if not agents_json: raise ValueError("ADCP_AGENTS environment variable not set") agents_data = json.loads(agents_json) agents = [AgentConfig(**agent) for agent in agents_data] return cls( agents=agents, webhook_url_template=os.getenv("WEBHOOK_URL_TEMPLATE"), webhook_secret=os.getenv("WEBHOOK_SECRET"), )Client for managing multiple AdCP agents.
Initialize multi-agent client.
Args
agents- List of agent configurations
webhook_url_template- Template for webhook URLs
webhook_secret- Secret for webhook verification
on_activity- Callback for activity events
handlers- Task completion handlers
Static methods
def from_env() ‑> ADCPMultiAgentClient-
Create client from environment variables.
Instance variables
prop agent_ids : list[str]-
Expand source code
@property def agent_ids(self) -> list[str]: """Get list of agent IDs.""" return list(self.agents.keys())Get list of agent IDs.
Methods
def agent(self, agent_id: str) ‑> ADCPClient-
Expand source code
def agent(self, agent_id: str) -> ADCPClient: """Get client for specific agent.""" if agent_id not in self.agents: raise ValueError(f"Agent not found: {agent_id}") return self.agents[agent_id]Get client for specific agent.
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close all agent clients and clean up resources.""" import asyncio logger.debug("Closing all agent clients in multi-agent client") close_tasks = [client.close() for client in self.agents.values()] await asyncio.gather(*close_tasks, return_exceptions=True)Close all agent clients and clean up resources.
async def get_products(self, request: GetProductsRequest) ‑> list[TaskResult[GetProductsResponse]]-
Expand source code
async def get_products( self, request: GetProductsRequest, ) -> list[TaskResult[GetProductsResponse]]: """ Execute get_products across all agents in parallel. Args: request: Request parameters Returns: List of TaskResults containing GetProductsResponse for each agent """ import asyncio tasks = [agent.get_products(request) for agent in self.agents.values()] return await asyncio.gather(*tasks)Execute get_products across all agents in parallel.
Args
request- Request parameters
Returns
List of TaskResults containing GetProductsResponse for each agent