Module adcp.client
Classes
class ADCPClient (agent_config: AgentConfig,
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
webhook_timestamp_tolerance: int = 300,
capabilities_ttl: float = 3600.0,
validate_features: bool = False,
strict_idempotency: bool = False,
signing: SigningConfig | None = None)-
Expand source code
class ADCPClient: """Client for interacting with a single AdCP agent.""" def __init__( self, agent_config: AgentConfig, webhook_url_template: str | None = None, webhook_secret: str | None = None, on_activity: Callable[[Activity], None] | None = None, webhook_timestamp_tolerance: int = 300, capabilities_ttl: float = 3600.0, validate_features: bool = False, strict_idempotency: bool = False, signing: SigningConfig | None = None, ): """ Initialize ADCP client for a single agent. Args: agent_config: Agent configuration webhook_url_template: Template for webhook URLs with {agent_id}, {task_type}, {operation_id} webhook_secret: Secret for webhook signature verification on_activity: Callback for activity events webhook_timestamp_tolerance: Maximum age (in seconds) for webhook timestamps. Webhooks with timestamps older than this or more than this far in the future are rejected. Defaults to 300 (5 minutes). capabilities_ttl: Time-to-live in seconds for cached capabilities (default: 1 hour) validate_features: When True, automatically check that the seller supports required features before making task calls (e.g., sync_audiences requires audience_targeting). Requires capabilities to have been fetched first. strict_idempotency: When True, verify the seller declared ``adcp.idempotency.replay_ttl_seconds`` in capabilities before any mutating call. Fetches capabilities lazily on first use. Raises ``IdempotencyUnsupportedError`` if the declaration is missing — sellers that don't declare it provide no retry-safety guarantee per AdCP #2315. Defaults to False for backward compatibility. signing: Optional RFC 9421 request-signing config. When provided, the client automatically attaches ``Signature`` / ``Signature-Input`` / ``Content-Digest`` headers to operations the seller's ``request_signing`` capability lists in ``required_for``, ``warn_for``, or ``supported_for``. The seller's ``covers_content_digest`` policy determines whether the body is bound to the signature. Generate a key with ``adcp-keygen`` and publish the public JWK at your ``jwks_uri``. Supported on both A2A and MCP (``mcp_transport="streamable_http"``); SSE-transport MCP logs a warning and falls through unsigned. """ self.agent_config = agent_config self.webhook_url_template = webhook_url_template self.webhook_secret = webhook_secret self.on_activity = on_activity self.webhook_timestamp_tolerance = webhook_timestamp_tolerance self.capabilities_ttl = capabilities_ttl self.validate_features = validate_features self.strict_idempotency = strict_idempotency self.signing = signing # Capabilities cache self._capabilities: GetAdcpCapabilitiesResponse | None = None self._feature_resolver: FeatureResolver | None = None self._capabilities_fetched_at: float | None = None self._idempotency_capability_verified: bool = False # Unique per-instance token so use_idempotency_key scopes to this # client and does not bleed to siblings (AdCP #2315 cross-seller risk). from uuid import uuid4 as _uuid4 self._idempotency_client_token: str = _uuid4().hex # Initialize protocol adapter self.adapter: ProtocolAdapter if agent_config.protocol == Protocol.A2A: self.adapter = A2AAdapter(agent_config) elif agent_config.protocol == Protocol.MCP: self.adapter = MCPAdapter(agent_config) else: raise ValueError(f"Unsupported protocol: {agent_config.protocol}") self.adapter.idempotency_client_token = self._idempotency_client_token if strict_idempotency: self.adapter.idempotency_capability_check = self._ensure_idempotency_capability if signing is not None: self.adapter.signing_request_hook = self._sign_outgoing_request # Initialize simple API accessor (lazy import to avoid circular dependency) from adcp.simple import SimpleAPI self.simple = SimpleAPI(self) async def _ensure_idempotency_capability(self) -> None: """Verify the seller positively declares idempotency support in capabilities. Called before every mutating request when ``strict_idempotency=True``. Fetches capabilities on first invocation; subsequent calls are no-ops once the declaration has been observed. Raises ``IdempotencyUnsupportedError`` when ``adcp.idempotency`` is missing, declares ``supported=False`` (seller does not dedupe — naive retry would double-process), or declares ``supported=True`` without a ``replay_ttl_seconds`` window. Sets ``_idempotency_capability_verified = True`` BEFORE calling ``fetch_capabilities`` so any recursive dispatch through the adapter terminates (``get_adcp_capabilities`` is non-mutating, so it would short-circuit anyway — but this guard protects against future refactors that might add it to the mutating set). """ from adcp.exceptions import IdempotencyUnsupportedError if self._idempotency_capability_verified: return self._idempotency_capability_verified = True try: caps = await self.fetch_capabilities() adcp_info = getattr(caps, "adcp", None) idempotency_info = getattr(adcp_info, "idempotency", None) if adcp_info else None if idempotency_info is None: raise IdempotencyUnsupportedError( agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, reason="seller did not declare adcp.idempotency", ) supported = getattr(idempotency_info, "supported", None) if supported is False: raise IdempotencyUnsupportedError( agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, reason="seller declared adcp.idempotency.supported=false", ) ttl = getattr(idempotency_info, "replay_ttl_seconds", None) if ttl is None: raise IdempotencyUnsupportedError( agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, reason=( "seller declared adcp.idempotency.supported=true but omitted " "replay_ttl_seconds" ), ) except Exception: self._idempotency_capability_verified = False raise async def _sign_outgoing_request(self, request: httpx.Request) -> None: """httpx request event hook that attaches RFC 9421 signature headers. Installed on the protocol adapter's httpx client when a ``SigningConfig`` was passed to ``ADCPClient``. Consults the seller's advertised ``request_signing`` capability and signs only the operations the seller listed in ``required_for``, ``warn_for``, or ``supported_for`` — other requests (including the agent-card fetch and ``get_adcp_capabilities`` itself) pass through unsigned. The ``covers_content_digest`` tri-state determines whether the body is bound to the signature. """ if self.signing is None: return operation = _signing_current_operation.get() # Unset ContextVar → out-of-band call (agent-card fetch, session # initialize, etc). Skip without fetching capabilities. # # get_adcp_capabilities → bootstrap carve-out: signing it would # require capabilities we don't have yet, and if a pathological # seller listed this op in its own required_for we'd recurse. # Keep this check narrow — only operations strictly required to # *obtain* capabilities belong here. Today that's just # get_adcp_capabilities. A future adapter that adds another # capabilities-precondition op MUST extend this guard. if operation is None or operation == "get_adcp_capabilities": return caps = await self.fetch_capabilities() req_signing = getattr(caps, "request_signing", None) # Detect and surface a malformed seller config: supported=False is # "signatures are ignored", but populating required_for alongside # it is contradictory. The classifier correctly skips (matches # verifier behavior) but the silent downgrade hides a config bug # that will bite pilots. if ( req_signing is not None and not req_signing.supported and (req_signing.required_for or req_signing.warn_for) ): logger.warning( "Seller %s advertises request_signing.supported=false but " "populates required_for/warn_for — treating as unsupported " "per spec. Verify the seller's capability advertisement.", self.agent_config.id, ) decision = operation_needs_signing(req_signing, operation) if decision == "skip": return covers_policy: str | None = None if req_signing is not None and req_signing.covers_content_digest is not None: covers_policy = req_signing.covers_content_digest.value if covers_policy == "forbidden": cover_digest = False elif covers_policy == "required": cover_digest = True else: # "either" or absent — signer's choice; default stricter. cover_digest = True body = request.content signed = sign_request( method=request.method, url=str(request.url), headers=dict(request.headers), body=body, private_key=self.signing.private_key, key_id=self.signing.key_id, alg=self.signing.alg, cover_content_digest=cover_digest, tag=self.signing.tag, ) # pop-then-set ensures our signed values are authoritative even if # another hook or earlier layer added a same-named header. httpx # headers are a case-insensitive MultiDict, so a naive assignment # could leave a duplicate value in a different case. for header_name, header_value in signed.as_dict().items(): request.headers.pop(header_name, None) request.headers[header_name] = header_value def get_webhook_url(self, task_type: str, operation_id: str) -> str: """Generate webhook URL for a task.""" if not self.webhook_url_template: raise ValueError("webhook_url_template not configured") return self.webhook_url_template.format( agent_id=self.agent_config.id, task_type=task_type, operation_id=operation_id, ) def _emit_activity(self, activity: Activity) -> None: """Emit activity event.""" if self.on_activity: self.on_activity(activity) @contextlib.contextmanager def use_idempotency_key(self, key: str) -> Iterator[str]: """Pin an ``idempotency_key`` for the next mutating call on THIS client. Use when you've persisted a key (e.g., in a buyer-side database) and want the SDK to send that exact key on resume or retry across process restarts. The key is validated against ``^[A-Za-z0-9_.:-]{16,255}$`` on entry; a ``ValueError`` is raised for malformed keys. Scope rules: * **Single-use within scope.** The first mutating call inside the ``with`` block consumes the pinned key; a second mutating call falls through to a fresh UUID. This protects against ``asyncio.gather`` siblings accidentally sharing the key (which would trigger ``IDEMPOTENCY_CONFLICT`` or silently duplicate work). If you need to retry, wrap each attempt in its own ``with`` block. * **Client-scoped.** The pinned key applies only to calls on THIS client. A mutating call on a sibling ``ADCPClient`` inside the same ``with`` block generates a fresh key and emits a ``UserWarning`` — keys must be unique per (seller, request) pair (AdCP #2315). * **No nesting.** Nested ``use_idempotency_key`` on the same client raises ``RuntimeError``. Example:: with client.use_idempotency_key(campaign.stored_key): result = await client.create_media_buy(request) """ from adcp import _idempotency _idempotency.validate_key(key) token = self._idempotency_client_token if token in _idempotency._scoped_keys: raise RuntimeError( "use_idempotency_key is already active on this client; " "nested usage is not supported." ) _idempotency._scoped_keys[token] = key try: yield key finally: _idempotency._scoped_keys.pop(token, None) # ======================================================================== # Capability Validation # ======================================================================== @property def capabilities(self) -> GetAdcpCapabilitiesResponse | None: """Return cached capabilities, or None if not yet fetched.""" return self._capabilities @property def feature_resolver(self) -> FeatureResolver | None: """Return the FeatureResolver for cached capabilities, or None.""" return self._feature_resolver async def fetch_capabilities(self) -> GetAdcpCapabilitiesResponse: """Fetch capabilities, using cache if still valid. Returns: The seller's capabilities response. """ if self._capabilities is not None and self._capabilities_fetched_at is not None: elapsed = time.monotonic() - self._capabilities_fetched_at if elapsed < self.capabilities_ttl: return self._capabilities return await self.refresh_capabilities() async def refresh_capabilities(self) -> GetAdcpCapabilitiesResponse: """Fetch capabilities from the seller, bypassing cache. Returns: The seller's capabilities response. """ result = await self.get_adcp_capabilities(GetAdcpCapabilitiesRequest()) if result.success and result.data is not None: self._capabilities = result.data self._feature_resolver = FeatureResolver(result.data) self._capabilities_fetched_at = time.monotonic() return self._capabilities raise ADCPError( f"Failed to fetch capabilities: {result.error or result.message}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) def _ensure_resolver(self) -> FeatureResolver: """Return the FeatureResolver, raising if capabilities haven't been fetched.""" if self._feature_resolver is None: raise ADCPError( "Cannot check feature support: capabilities have not been fetched. " "Call fetch_capabilities() first.", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) return self._feature_resolver def supports(self, feature: str) -> bool: """Check if the seller supports a feature. Supports multiple feature namespaces: - Protocol support: ``supports("media_buy")`` checks ``supported_protocols`` - Extension support: ``supports("ext:scope3")`` checks ``extensions_supported`` - Targeting: ``supports("targeting.geo_countries")`` checks ``media_buy.execution.targeting`` - Media buy features: ``supports("audience_targeting")`` checks ``media_buy.features`` - Signals features: ``supports("catalog_signals")`` checks ``signals.features`` Args: feature: Feature identifier to check. Returns: True if the seller declares the feature as supported. Raises: ADCPError: If capabilities have not been fetched yet. """ return self._ensure_resolver().supports(feature) def require(self, *features: str) -> None: """Assert that the seller supports all listed features. Args: *features: Feature identifiers to require. Raises: ADCPFeatureUnsupportedError: If any features are not supported. ADCPError: If capabilities have not been fetched yet. """ self._ensure_resolver().require( *features, agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, ) def _validate_task_features(self, task_name: str) -> None: """Check feature requirements for a task if validate_features is enabled. Returns without checking if validate_features is False or capabilities haven't been fetched yet (logs a warning in the latter case). """ if not self.validate_features: return if self._feature_resolver is None: logger.warning( "validate_features is enabled but capabilities have not been fetched. " "Call fetch_capabilities() to enable feature validation." ) return required_feature = TASK_FEATURE_MAP.get(task_name) if required_feature is None: return self.require(required_feature) async def get_products( self, request: GetProductsRequest, fetch_previews: bool = False, preview_output_format: str = "url", creative_agent_client: ADCPClient | None = None, ) -> TaskResult[GetProductsResponse]: """ Get advertising products. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) creative_agent_client: Client for creative agent (required if fetch_previews=True) Returns: TaskResult containing GetProductsResponse with optional preview URLs in metadata Raises: ValueError: If fetch_previews=True but creative_agent_client is not provided """ if fetch_previews and not creative_agent_client: raise ValueError("creative_agent_client is required when fetch_previews=True") operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_products(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[GetProductsResponse] = self.adapter._parse_response( raw_result, GetProductsResponse ) if fetch_previews and result.success and result.data and creative_agent_client: from adcp.utils.preview_cache import add_preview_urls_to_products products_with_previews = await add_preview_urls_to_products( result.data.products, creative_agent_client, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["products_with_previews"] = products_with_previews return result async def list_creative_formats( self, request: ListCreativeFormatsRequest, fetch_previews: bool = False, preview_output_format: str = "url", ) -> TaskResult[ListCreativeFormatsResponse]: """ List supported creative formats. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) Returns: TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creative_formats(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response( raw_result, ListCreativeFormatsResponse ) if fetch_previews and result.success and result.data: from adcp.utils.preview_cache import add_preview_urls_to_formats formats_with_previews = await add_preview_urls_to_formats( result.data.formats, self, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["formats_with_previews"] = formats_with_previews return result async def preview_creative( self, request: PreviewCreativeRequest, ) -> TaskResult[PreviewCreativeResponse]: """ Generate preview of a creative manifest. Args: request: Request parameters Returns: TaskResult containing PreviewCreativeResponse with preview URLs """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.preview_creative(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, PreviewCreativeResponse) async def sync_creatives( self, request: SyncCreativesRequest, ) -> TaskResult[SyncCreativesResponse]: """ Sync Creatives. Args: request: Request parameters Returns: TaskResult containing SyncCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCreativesResponse) async def list_creatives( self, request: ListCreativesRequest, ) -> TaskResult[ListCreativesResponse]: """ List Creatives. Args: request: Request parameters Returns: TaskResult containing ListCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListCreativesResponse) async def get_media_buy_delivery( self, request: GetMediaBuyDeliveryRequest, ) -> TaskResult[GetMediaBuyDeliveryResponse]: """ Get Media Buy Delivery. Args: request: Request parameters Returns: TaskResult containing GetMediaBuyDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse) async def get_media_buys( self, request: GetMediaBuysRequest, ) -> TaskResult[GetMediaBuysResponse]: """ Get Media Buys. Args: request: Request parameters Returns: TaskResult containing GetMediaBuysResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buys", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buys(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buys", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuysResponse) async def get_signals( self, request: GetSignalsRequest, ) -> TaskResult[GetSignalsResponse]: """ Get Signals. Args: request: Request parameters Returns: TaskResult containing GetSignalsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_signals(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetSignalsResponse) async def activate_signal( self, request: ActivateSignalRequest, ) -> TaskResult[ActivateSignalResponse]: """ Activate Signal. Args: request: Request parameters Returns: TaskResult containing ActivateSignalResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.activate_signal(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ActivateSignalResponse) async def provide_performance_feedback( self, request: ProvidePerformanceFeedbackRequest, ) -> TaskResult[ProvidePerformanceFeedbackResponse]: """ Provide Performance Feedback. Args: request: Request parameters Returns: TaskResult containing ProvidePerformanceFeedbackResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.provide_performance_feedback(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse) async def create_media_buy( self, request: CreateMediaBuyRequest, ) -> TaskResult[CreateMediaBuyResponse]: """ Create a new media buy reservation. Requests the agent to reserve inventory for a campaign. The agent returns a media_buy_id that tracks this reservation and can be used for updates. Args: request: Media buy creation parameters including: - brand: Brand reference; resolved from brand.json or the registry at execution - packages: List of package requests specifying desired inventory - publisher_properties: Target properties for ad placement - budget: Optional budget constraints - start_date/end_date: Campaign flight dates Returns: TaskResult containing CreateMediaBuyResponse with: - media_buy_id: Unique identifier for this reservation - status: Current state of the media buy - packages: Confirmed package details - Additional platform-specific metadata Example: >>> from adcp import ADCPClient, CreateMediaBuyRequest, BrandReference >>> client = ADCPClient(agent_config) >>> request = CreateMediaBuyRequest( ... brand=BrandReference(domain="acme.com"), ... packages=[package_request], ... publisher_properties=properties, ... ) >>> result = await client.create_media_buy(request) >>> if result.success: ... media_buy_id = result.data.media_buy_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_media_buy", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_media_buy(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_media_buy", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateMediaBuyResponse) async def update_media_buy( self, request: UpdateMediaBuyRequest, ) -> TaskResult[UpdateMediaBuyResponse]: """ Update an existing media buy reservation. Modifies a previously created media buy by updating packages or publisher properties. The update operation uses discriminated unions to specify what to change - either package details or targeting properties. Args: request: Media buy update parameters including: - media_buy_id: Identifier from create_media_buy response - updates: Discriminated union specifying update type: * UpdateMediaBuyPackagesRequest: Modify package selections * UpdateMediaBuyPropertiesRequest: Change targeting properties Returns: TaskResult containing UpdateMediaBuyResponse with: - media_buy_id: The updated media buy identifier - status: Updated state of the media buy - packages: Updated package configurations - Additional platform-specific metadata Example: >>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest >>> client = ADCPClient(agent_config) >>> request = UpdateMediaBuyPackagesRequest( ... media_buy_id="mb_123", ... packages=[updated_package] ... ) >>> result = await client.update_media_buy(request) >>> if result.success: ... updated_packages = result.data.packages """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_media_buy", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_media_buy(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_media_buy", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateMediaBuyResponse) async def build_creative( self, request: BuildCreativeRequest, ) -> TaskResult[BuildCreativeResponse]: """ Generate production-ready creative assets. Requests the creative agent to build final deliverable assets in the target format (e.g., VAST, DAAST, HTML5). This is typically called after previewing and approving a creative manifest. Args: request: Creative build parameters including: - manifest: Creative manifest with brand info and content - target_format_id: Desired output format identifier - inputs: Optional user-provided inputs for template variables - deployment: Platform or agent deployment configuration Returns: TaskResult containing BuildCreativeResponse with: - assets: Production-ready creative files (URLs or inline content) - format_id: The generated format identifier - manifest: The creative manifest used for generation - metadata: Additional platform-specific details Example: >>> from adcp import ADCPClient, BuildCreativeRequest >>> client = ADCPClient(agent_config) >>> request = BuildCreativeRequest( ... manifest=creative_manifest, ... target_format_id="vast_2.0", ... inputs={"duration": 30} ... ) >>> result = await client.build_creative(request) >>> if result.success: ... vast_url = result.data.assets[0].url """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="build_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.build_creative(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="build_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, BuildCreativeResponse) async def list_accounts( self, request: ListAccountsRequest, ) -> TaskResult[ListAccountsResponse]: """ List Accounts. Args: request: Request parameters Returns: TaskResult containing ListAccountsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_accounts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_accounts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_accounts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListAccountsResponse) async def sync_accounts( self, request: SyncAccountsRequest, ) -> TaskResult[SyncAccountsResponse]: """ Sync Accounts. Args: request: Request parameters Returns: TaskResult containing SyncAccountsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_accounts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_accounts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_accounts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncAccountsResponse) async def get_account_financials( self, request: GetAccountFinancialsRequest, ) -> TaskResult[GetAccountFinancialsResponse]: """ Get Account Financials. Args: request: Request parameters Returns: TaskResult containing GetAccountFinancialsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_account_financials", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_account_financials(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_account_financials", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetAccountFinancialsResponse) async def report_usage( self, request: ReportUsageRequest, ) -> TaskResult[ReportUsageResponse]: """ Report Usage. Args: request: Request parameters Returns: TaskResult containing ReportUsageResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_usage", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.report_usage(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_usage", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ReportUsageResponse) async def log_event( self, request: LogEventRequest, ) -> TaskResult[LogEventResponse]: """ Log Event. Args: request: Request parameters Returns: TaskResult containing LogEventResponse """ self._validate_task_features("log_event") operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="log_event", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.log_event(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="log_event", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, LogEventResponse) async def sync_event_sources( self, request: SyncEventSourcesRequest, ) -> TaskResult[SyncEventSourcesResponse]: """ Sync Event Sources. Args: request: Request parameters Returns: TaskResult containing SyncEventSourcesResponse """ self._validate_task_features("sync_event_sources") operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_event_sources", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_event_sources(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_event_sources", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncEventSourcesResponse) async def sync_audiences( self, request: SyncAudiencesRequest, ) -> TaskResult[SyncAudiencesResponse]: """ Sync Audiences. Args: request: Request parameters Returns: TaskResult containing SyncAudiencesResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_audiences", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_audiences(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_audiences", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncAudiencesResponse) async def sync_catalogs( self, request: SyncCatalogsRequest, ) -> TaskResult[SyncCatalogsResponse]: """ Sync Catalogs. Args: request: Request parameters Returns: TaskResult containing SyncCatalogsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_catalogs", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_catalogs(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_catalogs", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCatalogsResponse) async def get_creative_delivery( self, request: GetCreativeDeliveryRequest, ) -> TaskResult[GetCreativeDeliveryResponse]: """ Get Creative Delivery. Args: request: Request parameters Returns: TaskResult containing GetCreativeDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_creative_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCreativeDeliveryResponse) # ======================================================================== # V3 Protocol Methods - Protocol Discovery # ======================================================================== async def get_adcp_capabilities( self, request: GetAdcpCapabilitiesRequest, ) -> TaskResult[GetAdcpCapabilitiesResponse]: """ Get AdCP capabilities from the agent. Queries the agent's supported AdCP features, protocol versions, and domain-specific capabilities (media_buy, signals, sponsored_intelligence). Args: request: Request parameters including optional protocol filters Returns: TaskResult containing GetAdcpCapabilitiesResponse with: - adcp: Core protocol version information - supported_protocols: List of supported domain protocols - media_buy: Media buy capabilities (if supported) - sponsored_intelligence: SI capabilities (if supported) - signals: Signals capabilities (if supported) """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_adcp_capabilities", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_adcp_capabilities(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_adcp_capabilities", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetAdcpCapabilitiesResponse) # ======================================================================== # V3 Protocol Methods - Content Standards # ======================================================================== async def create_content_standards( self, request: CreateContentStandardsRequest, ) -> TaskResult[CreateContentStandardsResponse]: """ Create a new content standards configuration. Defines acceptable content contexts for ad placement using natural language policy and optional calibration exemplars. Args: request: Request parameters including policy and scope Returns: TaskResult containing CreateContentStandardsResponse with standards_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateContentStandardsResponse) async def get_content_standards( self, request: GetContentStandardsRequest, ) -> TaskResult[GetContentStandardsResponse]: """ Get a content standards configuration by ID. Args: request: Request parameters including standards_id Returns: TaskResult containing GetContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetContentStandardsResponse) async def list_content_standards( self, request: ListContentStandardsRequest, ) -> TaskResult[ListContentStandardsResponse]: """ List content standards configurations. Args: request: Request parameters including optional filters Returns: TaskResult containing ListContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListContentStandardsResponse) async def update_content_standards( self, request: UpdateContentStandardsRequest, ) -> TaskResult[UpdateContentStandardsResponse]: """ Update a content standards configuration. Args: request: Request parameters including standards_id and updates Returns: TaskResult containing UpdateContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateContentStandardsResponse) async def calibrate_content( self, request: CalibrateContentRequest, ) -> TaskResult[CalibrateContentResponse]: """ Calibrate content against standards. Evaluates content (artifact or URL) against configured standards to determine suitability for ad placement. Args: request: Request parameters including content to evaluate Returns: TaskResult containing CalibrateContentResponse with verdict """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="calibrate_content", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.calibrate_content(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="calibrate_content", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CalibrateContentResponse) async def validate_content_delivery( self, request: ValidateContentDeliveryRequest, ) -> TaskResult[ValidateContentDeliveryResponse]: """ Validate content delivery against standards. Validates that ad delivery records comply with content standards. Args: request: Request parameters including delivery records Returns: TaskResult containing ValidateContentDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="validate_content_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.validate_content_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="validate_content_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ValidateContentDeliveryResponse) async def get_media_buy_artifacts( self, request: GetMediaBuyArtifactsRequest, ) -> TaskResult[GetMediaBuyArtifactsResponse]: """ Get artifacts associated with a media buy. Retrieves content artifacts where ads were delivered for a media buy. Args: request: Request parameters including media_buy_id Returns: TaskResult containing GetMediaBuyArtifactsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_artifacts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_artifacts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_artifacts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyArtifactsResponse) # ======================================================================== # V3 Protocol Methods - Sponsored Intelligence # ======================================================================== async def si_get_offering( self, request: SiGetOfferingRequest, ) -> TaskResult[SiGetOfferingResponse]: """ Get sponsored intelligence offering. Retrieves product/service offerings that can be presented in a sponsored intelligence session. Args: request: Request parameters including brand context Returns: TaskResult containing SiGetOfferingResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_get_offering", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_get_offering(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_get_offering", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiGetOfferingResponse) async def si_initiate_session( self, request: SiInitiateSessionRequest, ) -> TaskResult[SiInitiateSessionResponse]: """ Initiate a sponsored intelligence session. Starts a conversational brand experience session with a user. Args: request: Request parameters including identity and context Returns: TaskResult containing SiInitiateSessionResponse with session_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_initiate_session", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_initiate_session(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_initiate_session", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiInitiateSessionResponse) async def si_send_message( self, request: SiSendMessageRequest, ) -> TaskResult[SiSendMessageResponse]: """ Send a message in a sponsored intelligence session. Continues the conversation in an active SI session. Args: request: Request parameters including session_id and message Returns: TaskResult containing SiSendMessageResponse with brand response """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_send_message", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_send_message(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_send_message", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiSendMessageResponse) async def si_terminate_session( self, request: SiTerminateSessionRequest, ) -> TaskResult[SiTerminateSessionResponse]: """ Terminate a sponsored intelligence session. Ends an active SI session, optionally with follow-up actions. Args: request: Request parameters including session_id and termination context Returns: TaskResult containing SiTerminateSessionResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_terminate_session", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_terminate_session(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_terminate_session", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiTerminateSessionResponse) # ======================================================================== # V3 Governance Methods # ======================================================================== async def get_creative_features( self, request: GetCreativeFeaturesRequest, ) -> TaskResult[GetCreativeFeaturesResponse]: """Evaluate governance features for a creative manifest.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_features", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_creative_features(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_features", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCreativeFeaturesResponse) async def sync_plans( self, request: SyncPlansRequest, ) -> TaskResult[SyncPlansResponse]: """Sync campaign governance plans to the governance agent.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_plans", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_plans(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_plans", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncPlansResponse) async def check_governance( self, request: CheckGovernanceRequest, ) -> TaskResult[CheckGovernanceResponse]: """Check a proposed or committed action against campaign governance.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="check_governance", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.check_governance(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="check_governance", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CheckGovernanceResponse) async def report_plan_outcome( self, request: ReportPlanOutcomeRequest, ) -> TaskResult[ReportPlanOutcomeResponse]: """Report the outcome of a governed action to the governance agent.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_plan_outcome", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.report_plan_outcome(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_plan_outcome", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ReportPlanOutcomeResponse) async def get_plan_audit_logs( self, request: GetPlanAuditLogsRequest, ) -> TaskResult[GetPlanAuditLogsResponse]: """Retrieve governance state and audit logs for one or more plans.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_plan_audit_logs", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_plan_audit_logs(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_plan_audit_logs", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetPlanAuditLogsResponse) async def create_property_list( self, request: CreatePropertyListRequest, ) -> TaskResult[CreatePropertyListResponse]: """ Create a property list for governance filtering. Property lists define dynamic sets of properties based on filters, brand manifests, and feature requirements. Args: request: Request parameters for creating the property list Returns: TaskResult containing CreatePropertyListResponse with list_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreatePropertyListResponse) async def get_property_list( self, request: GetPropertyListRequest, ) -> TaskResult[GetPropertyListResponse]: """ Get a property list with optional resolution. When resolve=true, returns the list of resolved property identifiers. Use this to get the actual properties that match the list's filters. Args: request: Request parameters including list_id and resolve flag Returns: TaskResult containing GetPropertyListResponse with identifiers """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetPropertyListResponse) async def list_property_lists( self, request: ListPropertyListsRequest, ) -> TaskResult[ListPropertyListsResponse]: """ List property lists owned by a principal. Retrieves metadata for all property lists, optionally filtered by principal or pagination parameters. Args: request: Request parameters with optional filtering Returns: TaskResult containing ListPropertyListsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_property_lists", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_property_lists(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_property_lists", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListPropertyListsResponse) async def update_property_list( self, request: UpdatePropertyListRequest, ) -> TaskResult[UpdatePropertyListResponse]: """ Update a property list. Modifies the filters, brand manifest, or other parameters of an existing property list. Args: request: Request parameters with list_id and updates Returns: TaskResult containing UpdatePropertyListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdatePropertyListResponse) async def delete_property_list( self, request: DeletePropertyListRequest, ) -> TaskResult[DeletePropertyListResponse]: """ Delete a property list. Removes a property list. Any active subscriptions to this list will be terminated. Args: request: Request parameters with list_id Returns: TaskResult containing DeletePropertyListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.delete_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, DeletePropertyListResponse) # ======================================================================== # V3 Protocol Methods - Governance (Collection Lists) # ======================================================================== async def create_collection_list( self, request: CreateCollectionListRequest, ) -> TaskResult[CreateCollectionListResponse]: """Create a collection list for governance filtering. Collection lists define dynamic sets of collections (properties, segments, etc.) that can be referenced by authorization rules and audience scoping. Args: request: Request parameters for creating the collection list Returns: TaskResult containing CreateCollectionListResponse with list_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_collection_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_collection_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_collection_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateCollectionListResponse) async def get_collection_list( self, request: GetCollectionListRequest, ) -> TaskResult[GetCollectionListResponse]: """Get a collection list with optional resolution. When resolve=true, returns the resolved members of the collection list. Args: request: Request parameters including list_id and resolve flag Returns: TaskResult containing GetCollectionListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_collection_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_collection_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_collection_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCollectionListResponse) async def list_collection_lists( self, request: ListCollectionListsRequest, ) -> TaskResult[ListCollectionListsResponse]: """List collection lists owned by a principal. Args: request: Request parameters with optional filtering Returns: TaskResult containing ListCollectionListsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_collection_lists", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_collection_lists(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_collection_lists", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListCollectionListsResponse) async def update_collection_list( self, request: UpdateCollectionListRequest, ) -> TaskResult[UpdateCollectionListResponse]: """Update a collection list. Args: request: Request parameters with list_id and updates Returns: TaskResult containing UpdateCollectionListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_collection_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_collection_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_collection_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateCollectionListResponse) async def delete_collection_list( self, request: DeleteCollectionListRequest, ) -> TaskResult[DeleteCollectionListResponse]: """Delete a collection list. Args: request: Request parameters with list_id Returns: TaskResult containing DeleteCollectionListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_collection_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.delete_collection_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_collection_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, DeleteCollectionListResponse) # ======================================================================== # V3 Protocol Methods - Governance (Sync Governance) # ======================================================================== async def sync_governance( self, request: SyncGovernanceRequest, ) -> TaskResult[SyncGovernanceResponse]: """Sync governance agents attached to an account. Attach, detach, or replace the set of governance agents that must be consulted for plan approval on an account. Args: request: Request parameters with account and governance agents Returns: TaskResult containing SyncGovernanceResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_governance", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_governance(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_governance", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncGovernanceResponse) # ======================================================================== # V3 Protocol Methods - Temporal Matching Protocol (TMP) # ======================================================================== async def context_match( self, request: ContextMatchRequest, ) -> TaskResult[ContextMatchResponse]: """Match ad context to buyer packages. Evaluates contextual signals for a publisher placement against the buyer's active packages and returns matching offers. Args: request: Context match request with placement, property, and optional artifact refs, context signals, and geo data. Returns: TaskResult containing ContextMatchResponse with offers. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True, by_alias=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="context_match", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.context_match(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="context_match", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ContextMatchResponse) async def identity_match( self, request: IdentityMatchRequest, ) -> TaskResult[IdentityMatchResponse]: """Match user identity for package eligibility. Evaluates a user identity token against all active packages for frequency capping and personalization. Args: request: Identity match request with user_token, uid_type, and package_ids. Returns: TaskResult containing IdentityMatchResponse with eligible_package_ids. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True, by_alias=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="identity_match", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.identity_match(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="identity_match", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, IdentityMatchResponse) # ======================================================================== # V3 Protocol Methods - Brand Rights # ======================================================================== async def get_brand_identity( self, request: GetBrandIdentityRequest, ) -> TaskResult[GetBrandIdentityResponse]: """Get brand identity information. Retrieves brand identity data including logos, colors, fonts, voice synthesis config, and rights availability. Args: request: Request with brand_id and optional fields filter. Returns: TaskResult containing GetBrandIdentityResponse. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_brand_identity", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_brand_identity(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_brand_identity", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetBrandIdentityResponse) async def get_rights( self, request: GetRightsRequest, ) -> TaskResult[GetRightsResponse]: """Get available rights for licensing. Searches for rights offerings using natural language query and filters by type, uses, countries, and buyer compatibility. Args: request: Request with query, uses, and optional filters. Returns: TaskResult containing GetRightsResponse with matched rights. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_rights", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_rights(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_rights", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetRightsResponse) async def acquire_rights( self, request: AcquireRightsRequest, ) -> TaskResult[AcquireRightsResponse]: """Acquire rights for brand content usage. Binding contractual request to license rights for a campaign. Returns credentials for generating rights-cleared content. Args: request: Request with rights_id, pricing_option_id, buyer, campaign, and revocation_webhook. Returns: TaskResult containing AcquireRightsResponse (acquired, pending_approval, rejected, or error). """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="acquire_rights", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.acquire_rights(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="acquire_rights", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, AcquireRightsResponse) async def update_rights( self, request: UpdateRightsRequest, ) -> TaskResult[UpdateRightsResponse]: """Update terms of an existing rights acquisition. Modifies a previously acquired rights record — typically to extend the ``end_date``, raise the ``impression_cap``, pause/unpause via ``paused``, or swap to a compatible ``pricing_option_id``. Partial update: pass only the fields you want to change. Failure modes (surface as ``TaskResult`` with ``success=False``): * Acquisition is expired or revoked — the seller rejects the update outright; mint a fresh ``acquire_rights`` instead. * ``pricing_option_id`` swap to an incompatible option — rejected; the new option's terms must be a strict superset / compatible with the original acquisition. * No partial-state mutations on rejection: the acquisition remains at its prior state when any field fails validation. Args: request: Request with ``rights_id`` and at least one mutable field (``end_date``, ``impression_cap``, ``paused``, or ``pricing_option_id``). Returns: TaskResult containing UpdateRightsResponse (updated or error). """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_rights", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_rights(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_rights", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateRightsResponse) # ======================================================================== # V3 Protocol Methods - Compliance # ======================================================================== async def comply_test_controller( self, request: ComplyTestControllerRequest, ) -> TaskResult[ComplyTestControllerResponse]: """Compliance test controller for sandbox testing. Enables sellers to simulate state transitions and delivery data in a sandbox environment for compliance testing. Args: request: Request specifying scenario and parameters. Returns: TaskResult containing ComplyTestControllerResponse. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="comply_test_controller", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.comply_test_controller(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="comply_test_controller", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ComplyTestControllerResponse) async def list_tools(self) -> list[str]: """ List available tools from the agent. Returns: List of tool names """ return await self.adapter.list_tools() async def get_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata. Returns agent card information including: - Agent name, description, version - Protocol type (mcp or a2a) - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills Returns: Dictionary with agent metadata """ return await self.adapter.get_agent_info() async def close(self) -> None: """Close the adapter and clean up resources.""" if hasattr(self.adapter, "close"): logger.debug(f"Closing adapter for agent {self.agent_config.id}") await self.adapter.close() async def __aenter__(self) -> ADCPClient: """Async context manager entry.""" return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Async context manager exit.""" await self.close() def _verify_webhook_signature( self, payload: dict[str, Any], signature: str, timestamp: str, raw_body: bytes | str | None = None, ) -> bool: """ Verify HMAC-SHA256 signature of webhook payload. The verification algorithm matches get_adcp_signed_headers_for_webhook: 1. Constructs message as "{timestamp}.{raw_http_body_bytes}" 2. HMAC-SHA256 signs with the shared secret 3. Compares against the provided signature (with "sha256=" prefix stripped) using constant-time comparison. Per AdCP spec (adcontextprotocol/adcp#2478): verifiers MUST use the raw HTTP body bytes captured before any JSON parse; they SHOULD NOT re-serialize a parsed payload to reconstruct the signed bytes, because re-serialization silently fails against signers whose output differs in separator choice, key order, unicode escapes, or number formatting — masking signer bugs the verifier should surface. Callers that genuinely cannot capture raw bytes MUST fail closed. This implementation therefore rejects verification attempts that don't supply ``raw_body``. Capture it from your framework's pre-parse hook (FastAPI ``Request.body()``, Flask ``request.get_data(cache=True)``, aiohttp ``Request.read()``, Express ``express.raw()``). Args: payload: Parsed webhook payload dict (not used for signing; kept for signature parity with callers, but verification derives solely from ``raw_body``). signature: Signature to verify (with or without "sha256=" prefix) timestamp: Unix timestamp in seconds from X-AdCP-Timestamp header raw_body: Raw HTTP request body bytes as received on the wire, captured before any JSON parse. Required. Returns: True if signature is valid, False otherwise (including when ``raw_body`` is missing — fails closed per spec). """ if not self.webhook_secret: logger.warning("Webhook signature verification skipped: no webhook_secret configured") return True # Fail closed per adcontextprotocol/adcp#2478: verifiers that cannot # capture raw bytes MUST reject, surfacing the infrastructure gap # rather than silently reconstructing a signed body that may diverge # from the bytes the signer actually hashed. if raw_body is None: logger.error( "Webhook signature verification failed: raw_body is required. " "Capture the raw HTTP body pre-parse and pass it to " "handle_webhook(raw_body=...). See " "https://adcontextprotocol.org/docs/building/implementation/security" "#legacy-hmac-sha256-fallback-deprecated-removed-in-40" ) return False # Reject stale or future timestamps to prevent replay attacks try: ts = int(timestamp) except (ValueError, TypeError): return False now = int(time.time()) if abs(now - ts) > self.webhook_timestamp_tolerance: return False # Strip "sha256=" prefix if present if signature.startswith("sha256="): signature = signature[7:] payload_str = raw_body.decode("utf-8") if isinstance(raw_body, bytes) else raw_body # Construct signed message: timestamp.payload signed_message = f"{timestamp}.{payload_str}" # Generate expected signature expected_signature = hmac.new( self.webhook_secret.encode("utf-8"), signed_message.encode("utf-8"), hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected_signature) def _parse_webhook_result( self, task_id: str, task_type: str, operation_id: str, status: GeneratedTaskStatus, result: Any, timestamp: datetime | str, message: str | None, context_id: str | None, ) -> TaskResult[AdcpAsyncResponseData]: """ Parse webhook data into typed TaskResult based on task_type. Args: task_id: Unique identifier for this task task_type: Task type from application routing (e.g., "get_products") operation_id: Operation identifier from application routing status: Current task status result: Task-specific payload (AdCP response data) timestamp: ISO 8601 timestamp when webhook was generated message: Human-readable summary of task state context_id: Session/conversation identifier Returns: TaskResult with task-specific typed response data Note: This method works with both MCP and A2A protocols by accepting protocol-agnostic parameters rather than protocol-specific objects. """ from adcp.utils.response_parser import parse_json_or_text # Map task types to their response types (using string literals, not enum) # Note: Some response types are Union types (e.g., ActivateSignalResponse = Success | Error) response_type_map: dict[str, type[BaseModel] | Any] = { # Core operations "get_products": GetProductsResponse, "list_creative_formats": ListCreativeFormatsResponse, "sync_creatives": SyncCreativesResponse, "list_creatives": ListCreativesResponse, "build_creative": BuildCreativeResponse, "preview_creative": PreviewCreativeResponse, "create_media_buy": CreateMediaBuyResponse, "update_media_buy": UpdateMediaBuyResponse, "get_media_buy_delivery": GetMediaBuyDeliveryResponse, "get_media_buys": GetMediaBuysResponse, "get_signals": GetSignalsResponse, "activate_signal": ActivateSignalResponse, "provide_performance_feedback": ProvidePerformanceFeedbackResponse, "report_usage": ReportUsageResponse, "get_account_financials": GetAccountFinancialsResponse, "list_accounts": ListAccountsResponse, "sync_accounts": SyncAccountsResponse, "log_event": LogEventResponse, "sync_event_sources": SyncEventSourcesResponse, "sync_audiences": SyncAudiencesResponse, "sync_catalogs": SyncCatalogsResponse, "get_creative_delivery": GetCreativeDeliveryResponse, # V3 Protocol Discovery "get_adcp_capabilities": GetAdcpCapabilitiesResponse, # V3 Content Standards "create_content_standards": CreateContentStandardsResponse, "get_content_standards": GetContentStandardsResponse, "list_content_standards": ListContentStandardsResponse, "update_content_standards": UpdateContentStandardsResponse, "calibrate_content": CalibrateContentResponse, "validate_content_delivery": ValidateContentDeliveryResponse, "get_media_buy_artifacts": GetMediaBuyArtifactsResponse, # V3 Sponsored Intelligence "si_get_offering": SiGetOfferingResponse, "si_initiate_session": SiInitiateSessionResponse, "si_send_message": SiSendMessageResponse, "si_terminate_session": SiTerminateSessionResponse, # V3 Governance "get_creative_features": GetCreativeFeaturesResponse, "sync_plans": SyncPlansResponse, "check_governance": CheckGovernanceResponse, "report_plan_outcome": ReportPlanOutcomeResponse, "get_plan_audit_logs": GetPlanAuditLogsResponse, "create_property_list": CreatePropertyListResponse, "get_property_list": GetPropertyListResponse, "list_property_lists": ListPropertyListsResponse, "update_property_list": UpdatePropertyListResponse, "delete_property_list": DeletePropertyListResponse, # TMP "context_match": ContextMatchResponse, "identity_match": IdentityMatchResponse, # Brand Rights "get_brand_identity": GetBrandIdentityResponse, "get_rights": GetRightsResponse, "acquire_rights": AcquireRightsResponse, "update_rights": UpdateRightsResponse, # Compliance "comply_test_controller": ComplyTestControllerResponse, } # Handle completed tasks with result parsing if status == GeneratedTaskStatus.completed and result is not None: response_type = response_type_map.get(task_type) if response_type: try: parsed_result: Any = parse_json_or_text(result, response_type) return TaskResult[AdcpAsyncResponseData]( status=TaskStatus.COMPLETED, data=parsed_result, success=True, metadata={ "task_id": task_id, "operation_id": operation_id, "timestamp": timestamp, "message": message, }, ) except ValueError as e: logger.warning(f"Failed to parse webhook result: {e}") # Fall through to untyped result # Handle failed, input-required, or unparseable results # Convert status to core TaskStatus enum status_map = { GeneratedTaskStatus.completed: TaskStatus.COMPLETED, GeneratedTaskStatus.submitted: TaskStatus.SUBMITTED, GeneratedTaskStatus.working: TaskStatus.WORKING, GeneratedTaskStatus.failed: TaskStatus.FAILED, GeneratedTaskStatus.input_required: TaskStatus.NEEDS_INPUT, } task_status = status_map.get(status, TaskStatus.FAILED) # Extract error message from result.errors if present error_message: str | None = None if result is not None and hasattr(result, "errors"): errors = getattr(result, "errors", None) if errors and len(errors) > 0: first_error = errors[0] if hasattr(first_error, "message"): error_message = first_error.message return TaskResult[AdcpAsyncResponseData]( status=task_status, data=result, success=status == GeneratedTaskStatus.completed, error=error_message, metadata={ "task_id": task_id, "operation_id": operation_id, "timestamp": timestamp, "message": message, "context_id": context_id, }, ) async def _handle_mcp_webhook( self, payload: dict[str, Any], task_type: str, operation_id: str, signature: str | None, timestamp: str | None = None, raw_body: bytes | str | None = None, ) -> TaskResult[AdcpAsyncResponseData]: """ Handle MCP webhook delivered via HTTP POST. Args: payload: Webhook payload dict task_type: Task type from application routing operation_id: Operation identifier from application routing signature: Optional HMAC-SHA256 signature for verification (X-AdCP-Signature header) timestamp: Optional Unix timestamp for signature verification (X-AdCP-Timestamp header) raw_body: Optional raw HTTP request body for signature verification Returns: TaskResult with parsed task-specific response data Raises: ADCPWebhookSignatureError: If signature verification fails ValidationError: If payload doesn't match McpWebhookPayload schema """ from adcp.types.generated_poc.core.mcp_webhook_payload import McpWebhookPayload # When a webhook_secret is configured, require signed webhooks if self.webhook_secret: if not signature or not timestamp: raise ADCPWebhookSignatureError( "Webhook signature and timestamp headers are required" ) if not self._verify_webhook_signature(payload, signature, timestamp, raw_body): logger.warning( f"Webhook signature verification failed for agent {self.agent_config.id}" ) raise ADCPWebhookSignatureError("Invalid webhook signature") # Validate and parse MCP webhook payload webhook = McpWebhookPayload.model_validate(payload) # Emit activity for monitoring self._emit_activity( Activity( type=ActivityType.WEBHOOK_RECEIVED, operation_id=operation_id, agent_id=self.agent_config.id, task_type=task_type, timestamp=datetime.now(timezone.utc).isoformat(), metadata={"payload": payload, "protocol": "mcp"}, ) ) # Extract fields and parse result return self._parse_webhook_result( task_id=webhook.task_id, task_type=task_type, operation_id=operation_id, status=webhook.status, result=webhook.result, timestamp=webhook.timestamp, message=webhook.message, context_id=webhook.context_id, ) async def _handle_a2a_webhook( self, payload: Task | TaskStatusUpdateEvent, task_type: str, operation_id: str ) -> TaskResult[AdcpAsyncResponseData]: """ Handle A2A webhook delivered through Task or TaskStatusUpdateEvent. Per A2A specification: - Terminated statuses (completed, failed): Payload is Task with artifacts[].parts[] - Intermediate statuses (working, input-required, submitted): Payload is TaskStatusUpdateEvent with status.message.parts[] Args: payload: A2A Task or TaskStatusUpdateEvent object task_type: Task type from application routing operation_id: Operation identifier from application routing Returns: TaskResult with parsed task-specific response data Note: Signature verification is NOT applicable for A2A webhooks as they arrive through authenticated A2A connections, not HTTP. """ from a2a.types import DataPart, TextPart adcp_data: Any = None text_message: str | None = None task_id: str context_id: str | None status_state: str timestamp: datetime | str # Type detection and extraction based on payload type if isinstance(payload, TaskStatusUpdateEvent): # Intermediate status: Extract from status.message.parts[] task_id = payload.task_id context_id = payload.context_id status_state = payload.status.state if payload.status else "failed" timestamp = ( payload.status.timestamp if payload.status and payload.status.timestamp else datetime.now(timezone.utc) ) # Extract from status.message.parts[] if payload.status and payload.status.message and payload.status.message.parts: # Extract DataPart for structured AdCP payload data_parts = [ p.root for p in payload.status.message.parts if isinstance(p.root, DataPart) ] if data_parts: # Use last DataPart as authoritative last_data_part = data_parts[-1] adcp_data = last_data_part.data # Unwrap {"response": {...}} wrapper if present (ADK pattern) if isinstance(adcp_data, dict) and "response" in adcp_data: adcp_data = adcp_data["response"] # Extract TextPart for human-readable message for part in payload.status.message.parts: if isinstance(part.root, TextPart): text_message = part.root.text break else: # Terminated status (Task): Extract from artifacts[].parts[] task_id = payload.id context_id = payload.context_id status_state = payload.status.state if payload.status else "failed" timestamp = ( payload.status.timestamp if payload.status and payload.status.timestamp else datetime.now(timezone.utc) ) # Extract from task.artifacts[].parts[] # Following A2A spec: use last artifact, last DataPart is authoritative if payload.artifacts: # Use last artifact (most recent in streaming scenarios) target_artifact = payload.artifacts[-1] if target_artifact.parts: # Extract DataPart for structured AdCP payload data_parts = [ p.root for p in target_artifact.parts if isinstance(p.root, DataPart) ] if data_parts: # Use last DataPart as authoritative last_data_part = data_parts[-1] adcp_data = last_data_part.data # Unwrap {"response": {...}} wrapper if present (ADK pattern) if isinstance(adcp_data, dict) and "response" in adcp_data: adcp_data = adcp_data["response"] # Extract TextPart for human-readable message for part in target_artifact.parts: if isinstance(part.root, TextPart): text_message = part.root.text break # Map A2A status.state to GeneratedTaskStatus enum status_map = { "completed": GeneratedTaskStatus.completed, "submitted": GeneratedTaskStatus.submitted, "working": GeneratedTaskStatus.working, "failed": GeneratedTaskStatus.failed, "input-required": GeneratedTaskStatus.input_required, "input_required": GeneratedTaskStatus.input_required, # Handle both formats } mapped_status = status_map.get(status_state, GeneratedTaskStatus.failed) # Emit activity for monitoring self._emit_activity( Activity( type=ActivityType.WEBHOOK_RECEIVED, operation_id=operation_id, agent_id=self.agent_config.id, task_type=task_type, timestamp=datetime.now(timezone.utc).isoformat(), metadata={ "task_id": task_id, "protocol": "a2a", "payload_type": ( "TaskStatusUpdateEvent" if isinstance(payload, TaskStatusUpdateEvent) else "Task" ), }, ) ) # Parse and return typed result by passing extracted fields directly return self._parse_webhook_result( task_id=task_id, task_type=task_type, operation_id=operation_id, status=mapped_status, result=adcp_data, timestamp=timestamp, message=text_message, context_id=context_id, ) async def handle_webhook( self, payload: dict[str, Any] | Task | TaskStatusUpdateEvent, task_type: str, operation_id: str, signature: str | None = None, timestamp: str | None = None, raw_body: bytes | str | None = None, ) -> TaskResult[AdcpAsyncResponseData]: """ Handle incoming webhook and return typed result. This method provides a unified interface for handling webhooks from both MCP and A2A protocols: - MCP Webhooks: HTTP POST with dict payload, optional HMAC signature - A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status The method automatically detects the protocol type and routes to the appropriate handler. Both protocols return a consistent TaskResult structure with typed AdCP response data. Args: payload: Webhook payload - one of: - dict[str, Any]: MCP webhook payload from HTTP POST - Task: A2A webhook for terminated statuses (completed, failed) - TaskStatusUpdateEvent: A2A webhook for intermediate statuses (working, input-required, submitted) task_type: Task type from application routing (e.g., "get_products"). Applications should extract this from URL routing pattern: /webhook/{task_type}/{agent_id}/{operation_id} operation_id: Operation identifier from application routing. Used to correlate webhook notifications with original task submission. signature: Optional HMAC-SHA256 signature for MCP webhook verification (X-AdCP-Signature header). Ignored for A2A webhooks. timestamp: Optional Unix timestamp (seconds) for MCP webhook signature verification (X-AdCP-Timestamp header). Required when signature is provided. raw_body: Optional raw HTTP request body bytes for signature verification. When provided, used directly instead of re-serializing the payload, avoiding cross-language JSON serialization mismatches. Strongly recommended for production use. Returns: TaskResult with parsed task-specific response data. The structure is identical regardless of protocol. Raises: ADCPWebhookSignatureError: If MCP signature verification fails ValidationError: If MCP payload doesn't match WebhookPayload schema Note: task_type and operation_id were deprecated from the webhook payload per AdCP specification. Applications must extract these from URL routing and pass them explicitly. Examples: MCP webhook (HTTP endpoint): >>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}") >>> async def webhook_handler(task_type: str, operation_id: str, request: Request): >>> raw_body = await request.body() >>> payload = json.loads(raw_body) >>> signature = request.headers.get("X-AdCP-Signature") >>> timestamp = request.headers.get("X-AdCP-Timestamp") >>> result = await client.handle_webhook( >>> payload, task_type, operation_id, signature, timestamp, >>> raw_body=raw_body, >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}") A2A webhook with Task (terminated status): >>> async def on_task_completed(task: Task): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(task.id) >>> operation_id = your_task_registry.get_operation_id(task.id) >>> result = await client.handle_webhook( >>> task, task_type, operation_id >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}") A2A webhook with TaskStatusUpdateEvent (intermediate status): >>> async def on_task_update(event: TaskStatusUpdateEvent): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(event.task_id) >>> operation_id = your_task_registry.get_operation_id(event.task_id) >>> result = await client.handle_webhook( >>> event, task_type, operation_id >>> ) >>> if result.status == GeneratedTaskStatus.working: >>> print(f"Task still working: {result.metadata.get('message')}") """ # Detect protocol type and route to appropriate handler if isinstance(payload, (Task, TaskStatusUpdateEvent)): # A2A webhook (Task or TaskStatusUpdateEvent) return await self._handle_a2a_webhook(payload, task_type, operation_id) else: # MCP webhook (dict payload) return await self._handle_mcp_webhook( payload, task_type, operation_id, signature, timestamp, raw_body )Client for interacting with a single AdCP agent.
Initialize ADCP client for a single agent.
Args
agent_config- Agent configuration
webhook_url_template- Template for webhook URLs with {agent_id}, {task_type}, {operation_id}
webhook_secret- Secret for webhook signature verification
on_activity- Callback for activity events
webhook_timestamp_tolerance- Maximum age (in seconds) for webhook timestamps. Webhooks with timestamps older than this or more than this far in the future are rejected. Defaults to 300 (5 minutes).
capabilities_ttl- Time-to-live in seconds for cached capabilities (default: 1 hour)
validate_features- When True, automatically check that the seller supports required features before making task calls (e.g., sync_audiences requires audience_targeting). Requires capabilities to have been fetched first.
strict_idempotency- When True, verify the seller declared
adcp.idempotency.replay_ttl_secondsin capabilities before any mutating call. Fetches capabilities lazily on first use. RaisesIdempotencyUnsupportedErrorif the declaration is missing — sellers that don't declare it provide no retry-safety guarantee per AdCP #2315. Defaults to False for backward compatibility. signing- Optional RFC 9421 request-signing config. When provided,
the client automatically attaches
Signature/Signature-Input/Content-Digestheaders to operations the seller'srequest_signingcapability lists inrequired_for,warn_for, orsupported_for. The seller'scovers_content_digestpolicy determines whether the body is bound to the signature. Generate a key withadcp-keygenand publish the public JWK at yourjwks_uri. Supported on both A2A and MCP (mcp_transport="streamable_http"); SSE-transport MCP logs a warning and falls through unsigned.
Instance variables
prop capabilities : GetAdcpCapabilitiesResponse | None-
Expand source code
@property def capabilities(self) -> GetAdcpCapabilitiesResponse | None: """Return cached capabilities, or None if not yet fetched.""" return self._capabilitiesReturn cached capabilities, or None if not yet fetched.
prop feature_resolver : FeatureResolver | None-
Expand source code
@property def feature_resolver(self) -> FeatureResolver | None: """Return the FeatureResolver for cached capabilities, or None.""" return self._feature_resolverReturn the FeatureResolver for cached capabilities, or None.
Methods
async def acquire_rights(self, request: AcquireRightsRequest) ‑> TaskResult[Union[AcquireRightsResponse1, AcquireRightsResponse2, AcquireRightsResponse3, AcquireRightsResponse4]]-
Expand source code
async def acquire_rights( self, request: AcquireRightsRequest, ) -> TaskResult[AcquireRightsResponse]: """Acquire rights for brand content usage. Binding contractual request to license rights for a campaign. Returns credentials for generating rights-cleared content. Args: request: Request with rights_id, pricing_option_id, buyer, campaign, and revocation_webhook. Returns: TaskResult containing AcquireRightsResponse (acquired, pending_approval, rejected, or error). """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="acquire_rights", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.acquire_rights(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="acquire_rights", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, AcquireRightsResponse)Acquire rights for brand content usage.
Binding contractual request to license rights for a campaign. Returns credentials for generating rights-cleared content.
Args
request- Request with rights_id, pricing_option_id, buyer, campaign, and revocation_webhook.
Returns
TaskResult containing AcquireRightsResponse (acquired, pending_approval, rejected, or error).
async def activate_signal(self, request: ActivateSignalRequest) ‑> TaskResult[Union[ActivateSignalResponse1, ActivateSignalResponse2]]-
Expand source code
async def activate_signal( self, request: ActivateSignalRequest, ) -> TaskResult[ActivateSignalResponse]: """ Activate Signal. Args: request: Request parameters Returns: TaskResult containing ActivateSignalResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.activate_signal(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="activate_signal", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ActivateSignalResponse)Activate Signal.
Args
request- Request parameters
Returns
TaskResult containing ActivateSignalResponse
async def build_creative(self, request: BuildCreativeRequest) ‑> TaskResult[Union[BuildCreativeResponse1, BuildCreativeResponse2, BuildCreativeResponse3]]-
Expand source code
async def build_creative( self, request: BuildCreativeRequest, ) -> TaskResult[BuildCreativeResponse]: """ Generate production-ready creative assets. Requests the creative agent to build final deliverable assets in the target format (e.g., VAST, DAAST, HTML5). This is typically called after previewing and approving a creative manifest. Args: request: Creative build parameters including: - manifest: Creative manifest with brand info and content - target_format_id: Desired output format identifier - inputs: Optional user-provided inputs for template variables - deployment: Platform or agent deployment configuration Returns: TaskResult containing BuildCreativeResponse with: - assets: Production-ready creative files (URLs or inline content) - format_id: The generated format identifier - manifest: The creative manifest used for generation - metadata: Additional platform-specific details Example: >>> from adcp import ADCPClient, BuildCreativeRequest >>> client = ADCPClient(agent_config) >>> request = BuildCreativeRequest( ... manifest=creative_manifest, ... target_format_id="vast_2.0", ... inputs={"duration": 30} ... ) >>> result = await client.build_creative(request) >>> if result.success: ... vast_url = result.data.assets[0].url """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="build_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.build_creative(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="build_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, BuildCreativeResponse)Generate production-ready creative assets.
Requests the creative agent to build final deliverable assets in the target format (e.g., VAST, DAAST, HTML5). This is typically called after previewing and approving a creative manifest.
Args
request- Creative build parameters including: - manifest: Creative manifest with brand info and content - target_format_id: Desired output format identifier - inputs: Optional user-provided inputs for template variables - deployment: Platform or agent deployment configuration
Returns
TaskResult containing BuildCreativeResponse with: - assets: Production-ready creative files (URLs or inline content) - format_id: The generated format identifier - manifest: The creative manifest used for generation - metadata: Additional platform-specific details
Example
>>> from adcp import ADCPClient, BuildCreativeRequest >>> client = ADCPClient(agent_config) >>> request = BuildCreativeRequest( ... manifest=creative_manifest, ... target_format_id="vast_2.0", ... inputs={"duration": 30} ... ) >>> result = await client.build_creative(request) >>> if result.success: ... vast_url = result.data.assets[0].url async def calibrate_content(self, request: CalibrateContentRequest) ‑> TaskResult[Union[CalibrateContentResponse1, CalibrateContentResponse2]]-
Expand source code
async def calibrate_content( self, request: CalibrateContentRequest, ) -> TaskResult[CalibrateContentResponse]: """ Calibrate content against standards. Evaluates content (artifact or URL) against configured standards to determine suitability for ad placement. Args: request: Request parameters including content to evaluate Returns: TaskResult containing CalibrateContentResponse with verdict """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="calibrate_content", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.calibrate_content(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="calibrate_content", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CalibrateContentResponse)Calibrate content against standards.
Evaluates content (artifact or URL) against configured standards to determine suitability for ad placement.
Args
request- Request parameters including content to evaluate
Returns
TaskResult containing CalibrateContentResponse with verdict
async def check_governance(self, request: CheckGovernanceRequest) ‑> TaskResult[CheckGovernanceResponse]-
Expand source code
async def check_governance( self, request: CheckGovernanceRequest, ) -> TaskResult[CheckGovernanceResponse]: """Check a proposed or committed action against campaign governance.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="check_governance", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.check_governance(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="check_governance", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CheckGovernanceResponse)Check a proposed or committed action against campaign governance.
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close the adapter and clean up resources.""" if hasattr(self.adapter, "close"): logger.debug(f"Closing adapter for agent {self.agent_config.id}") await self.adapter.close()Close the adapter and clean up resources.
async def comply_test_controller(self, request: ComplyTestControllerRequest) ‑> TaskResult[Union[ComplyTestControllerResponse1, ComplyTestControllerResponse2, ComplyTestControllerResponse3, ComplyTestControllerResponse4]]-
Expand source code
async def comply_test_controller( self, request: ComplyTestControllerRequest, ) -> TaskResult[ComplyTestControllerResponse]: """Compliance test controller for sandbox testing. Enables sellers to simulate state transitions and delivery data in a sandbox environment for compliance testing. Args: request: Request specifying scenario and parameters. Returns: TaskResult containing ComplyTestControllerResponse. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="comply_test_controller", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.comply_test_controller(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="comply_test_controller", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ComplyTestControllerResponse)Compliance test controller for sandbox testing.
Enables sellers to simulate state transitions and delivery data in a sandbox environment for compliance testing.
Args
request- Request specifying scenario and parameters.
Returns
TaskResult containing ComplyTestControllerResponse.
async def context_match(self, request: ContextMatchRequest) ‑> TaskResult[ContextMatchResponse]-
Expand source code
async def context_match( self, request: ContextMatchRequest, ) -> TaskResult[ContextMatchResponse]: """Match ad context to buyer packages. Evaluates contextual signals for a publisher placement against the buyer's active packages and returns matching offers. Args: request: Context match request with placement, property, and optional artifact refs, context signals, and geo data. Returns: TaskResult containing ContextMatchResponse with offers. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True, by_alias=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="context_match", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.context_match(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="context_match", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ContextMatchResponse)Match ad context to buyer packages.
Evaluates contextual signals for a publisher placement against the buyer's active packages and returns matching offers.
Args
request- Context match request with placement, property, and optional artifact refs, context signals, and geo data.
Returns
TaskResult containing ContextMatchResponse with offers.
async def create_collection_list(self, request: CreateCollectionListRequest) ‑> TaskResult[CreateCollectionListResponse]-
Expand source code
async def create_collection_list( self, request: CreateCollectionListRequest, ) -> TaskResult[CreateCollectionListResponse]: """Create a collection list for governance filtering. Collection lists define dynamic sets of collections (properties, segments, etc.) that can be referenced by authorization rules and audience scoping. Args: request: Request parameters for creating the collection list Returns: TaskResult containing CreateCollectionListResponse with list_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_collection_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_collection_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_collection_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateCollectionListResponse)Create a collection list for governance filtering.
Collection lists define dynamic sets of collections (properties, segments, etc.) that can be referenced by authorization rules and audience scoping.
Args
request- Request parameters for creating the collection list
Returns
TaskResult containing CreateCollectionListResponse with list_id
async def create_content_standards(self, request: CreateContentStandardsRequest) ‑> TaskResult[Union[CreateContentStandardsResponse1, CreateContentStandardsResponse2]]-
Expand source code
async def create_content_standards( self, request: CreateContentStandardsRequest, ) -> TaskResult[CreateContentStandardsResponse]: """ Create a new content standards configuration. Defines acceptable content contexts for ad placement using natural language policy and optional calibration exemplars. Args: request: Request parameters including policy and scope Returns: TaskResult containing CreateContentStandardsResponse with standards_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateContentStandardsResponse)Create a new content standards configuration.
Defines acceptable content contexts for ad placement using natural language policy and optional calibration exemplars.
Args
request- Request parameters including policy and scope
Returns
TaskResult containing CreateContentStandardsResponse with standards_id
async def create_media_buy(self, request: CreateMediaBuyRequest) ‑> TaskResult[Union[CreateMediaBuyResponse1, CreateMediaBuyResponse2, CreateMediaBuyResponse3]]-
Expand source code
async def create_media_buy( self, request: CreateMediaBuyRequest, ) -> TaskResult[CreateMediaBuyResponse]: """ Create a new media buy reservation. Requests the agent to reserve inventory for a campaign. The agent returns a media_buy_id that tracks this reservation and can be used for updates. Args: request: Media buy creation parameters including: - brand: Brand reference; resolved from brand.json or the registry at execution - packages: List of package requests specifying desired inventory - publisher_properties: Target properties for ad placement - budget: Optional budget constraints - start_date/end_date: Campaign flight dates Returns: TaskResult containing CreateMediaBuyResponse with: - media_buy_id: Unique identifier for this reservation - status: Current state of the media buy - packages: Confirmed package details - Additional platform-specific metadata Example: >>> from adcp import ADCPClient, CreateMediaBuyRequest, BrandReference >>> client = ADCPClient(agent_config) >>> request = CreateMediaBuyRequest( ... brand=BrandReference(domain="acme.com"), ... packages=[package_request], ... publisher_properties=properties, ... ) >>> result = await client.create_media_buy(request) >>> if result.success: ... media_buy_id = result.data.media_buy_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_media_buy", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_media_buy(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_media_buy", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreateMediaBuyResponse)Create a new media buy reservation.
Requests the agent to reserve inventory for a campaign. The agent returns a media_buy_id that tracks this reservation and can be used for updates.
Args
request- Media buy creation parameters including: - brand: Brand reference; resolved from brand.json or the registry at execution - packages: List of package requests specifying desired inventory - publisher_properties: Target properties for ad placement - budget: Optional budget constraints - start_date/end_date: Campaign flight dates
Returns
TaskResult containing CreateMediaBuyResponse with: - media_buy_id: Unique identifier for this reservation - status: Current state of the media buy - packages: Confirmed package details - Additional platform-specific metadata
Example
>>> from adcp import ADCPClient, CreateMediaBuyRequest, BrandReference >>> client = ADCPClient(agent_config) >>> request = CreateMediaBuyRequest( ... brand=BrandReference(domain="acme.com"), ... packages=[package_request], ... publisher_properties=properties, ... ) >>> result = await client.create_media_buy(request) >>> if result.success: ... media_buy_id = result.data.media_buy_id async def create_property_list(self, request: CreatePropertyListRequest) ‑> TaskResult[CreatePropertyListResponse]-
Expand source code
async def create_property_list( self, request: CreatePropertyListRequest, ) -> TaskResult[CreatePropertyListResponse]: """ Create a property list for governance filtering. Property lists define dynamic sets of properties based on filters, brand manifests, and feature requirements. Args: request: Request parameters for creating the property list Returns: TaskResult containing CreatePropertyListResponse with list_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.create_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="create_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, CreatePropertyListResponse)Create a property list for governance filtering.
Property lists define dynamic sets of properties based on filters, brand manifests, and feature requirements.
Args
request- Request parameters for creating the property list
Returns
TaskResult containing CreatePropertyListResponse with list_id
async def delete_collection_list(self, request: DeleteCollectionListRequest) ‑> TaskResult[DeleteCollectionListResponse]-
Expand source code
async def delete_collection_list( self, request: DeleteCollectionListRequest, ) -> TaskResult[DeleteCollectionListResponse]: """Delete a collection list. Args: request: Request parameters with list_id Returns: TaskResult containing DeleteCollectionListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_collection_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.delete_collection_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_collection_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, DeleteCollectionListResponse)Delete a collection list.
Args
request- Request parameters with list_id
Returns
TaskResult containing DeleteCollectionListResponse
async def delete_property_list(self, request: DeletePropertyListRequest) ‑> TaskResult[DeletePropertyListResponse]-
Expand source code
async def delete_property_list( self, request: DeletePropertyListRequest, ) -> TaskResult[DeletePropertyListResponse]: """ Delete a property list. Removes a property list. Any active subscriptions to this list will be terminated. Args: request: Request parameters with list_id Returns: TaskResult containing DeletePropertyListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.delete_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="delete_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, DeletePropertyListResponse)Delete a property list.
Removes a property list. Any active subscriptions to this list will be terminated.
Args
request- Request parameters with list_id
Returns
TaskResult containing DeletePropertyListResponse
async def fetch_capabilities(self) ‑> GetAdcpCapabilitiesResponse-
Expand source code
async def fetch_capabilities(self) -> GetAdcpCapabilitiesResponse: """Fetch capabilities, using cache if still valid. Returns: The seller's capabilities response. """ if self._capabilities is not None and self._capabilities_fetched_at is not None: elapsed = time.monotonic() - self._capabilities_fetched_at if elapsed < self.capabilities_ttl: return self._capabilities return await self.refresh_capabilities()Fetch capabilities, using cache if still valid.
Returns
The seller's capabilities response.
async def get_account_financials(self, request: GetAccountFinancialsRequest) ‑> TaskResult[Union[GetAccountFinancialsResponse1, GetAccountFinancialsResponse2]]-
Expand source code
async def get_account_financials( self, request: GetAccountFinancialsRequest, ) -> TaskResult[GetAccountFinancialsResponse]: """ Get Account Financials. Args: request: Request parameters Returns: TaskResult containing GetAccountFinancialsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_account_financials", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_account_financials(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_account_financials", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetAccountFinancialsResponse)Get Account Financials.
Args
request- Request parameters
Returns
TaskResult containing GetAccountFinancialsResponse
async def get_adcp_capabilities(self, request: GetAdcpCapabilitiesRequest) ‑> TaskResult[GetAdcpCapabilitiesResponse]-
Expand source code
async def get_adcp_capabilities( self, request: GetAdcpCapabilitiesRequest, ) -> TaskResult[GetAdcpCapabilitiesResponse]: """ Get AdCP capabilities from the agent. Queries the agent's supported AdCP features, protocol versions, and domain-specific capabilities (media_buy, signals, sponsored_intelligence). Args: request: Request parameters including optional protocol filters Returns: TaskResult containing GetAdcpCapabilitiesResponse with: - adcp: Core protocol version information - supported_protocols: List of supported domain protocols - media_buy: Media buy capabilities (if supported) - sponsored_intelligence: SI capabilities (if supported) - signals: Signals capabilities (if supported) """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_adcp_capabilities", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_adcp_capabilities(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_adcp_capabilities", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetAdcpCapabilitiesResponse)Get AdCP capabilities from the agent.
Queries the agent's supported AdCP features, protocol versions, and domain-specific capabilities (media_buy, signals, sponsored_intelligence).
Args
request- Request parameters including optional protocol filters
Returns
TaskResult containing GetAdcpCapabilitiesResponse with: - adcp: Core protocol version information - supported_protocols: List of supported domain protocols - media_buy: Media buy capabilities (if supported) - sponsored_intelligence: SI capabilities (if supported) - signals: Signals capabilities (if supported)
async def get_brand_identity(self, request: GetBrandIdentityRequest) ‑> TaskResult[Union[GetBrandIdentityResponse1, GetBrandIdentityResponse2]]-
Expand source code
async def get_brand_identity( self, request: GetBrandIdentityRequest, ) -> TaskResult[GetBrandIdentityResponse]: """Get brand identity information. Retrieves brand identity data including logos, colors, fonts, voice synthesis config, and rights availability. Args: request: Request with brand_id and optional fields filter. Returns: TaskResult containing GetBrandIdentityResponse. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_brand_identity", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_brand_identity(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_brand_identity", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetBrandIdentityResponse)Get brand identity information.
Retrieves brand identity data including logos, colors, fonts, voice synthesis config, and rights availability.
Args
request- Request with brand_id and optional fields filter.
Returns
TaskResult containing GetBrandIdentityResponse.
async def get_collection_list(self, request: GetCollectionListRequest) ‑> TaskResult[GetCollectionListResponse]-
Expand source code
async def get_collection_list( self, request: GetCollectionListRequest, ) -> TaskResult[GetCollectionListResponse]: """Get a collection list with optional resolution. When resolve=true, returns the resolved members of the collection list. Args: request: Request parameters including list_id and resolve flag Returns: TaskResult containing GetCollectionListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_collection_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_collection_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_collection_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCollectionListResponse)Get a collection list with optional resolution.
When resolve=true, returns the resolved members of the collection list.
Args
request- Request parameters including list_id and resolve flag
Returns
TaskResult containing GetCollectionListResponse
async def get_content_standards(self, request: GetContentStandardsRequest) ‑> TaskResult[Union[GetContentStandardsResponse1, GetContentStandardsResponse2]]-
Expand source code
async def get_content_standards( self, request: GetContentStandardsRequest, ) -> TaskResult[GetContentStandardsResponse]: """ Get a content standards configuration by ID. Args: request: Request parameters including standards_id Returns: TaskResult containing GetContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetContentStandardsResponse)Get a content standards configuration by ID.
Args
request- Request parameters including standards_id
Returns
TaskResult containing GetContentStandardsResponse
async def get_creative_delivery(self, request: GetCreativeDeliveryRequest) ‑> TaskResult[GetCreativeDeliveryResponse]-
Expand source code
async def get_creative_delivery( self, request: GetCreativeDeliveryRequest, ) -> TaskResult[GetCreativeDeliveryResponse]: """ Get Creative Delivery. Args: request: Request parameters Returns: TaskResult containing GetCreativeDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_creative_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCreativeDeliveryResponse)Get Creative Delivery.
Args
request- Request parameters
Returns
TaskResult containing GetCreativeDeliveryResponse
async def get_creative_features(self, request: GetCreativeFeaturesRequest) ‑> TaskResult[Union[GetCreativeFeaturesResponse1, GetCreativeFeaturesResponse2]]-
Expand source code
async def get_creative_features( self, request: GetCreativeFeaturesRequest, ) -> TaskResult[GetCreativeFeaturesResponse]: """Evaluate governance features for a creative manifest.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_features", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_creative_features(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_creative_features", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetCreativeFeaturesResponse)Evaluate governance features for a creative manifest.
async def get_info(self) ‑> dict[str, typing.Any]-
Expand source code
async def get_info(self) -> dict[str, Any]: """ Get agent information including AdCP extension metadata. Returns agent card information including: - Agent name, description, version - Protocol type (mcp or a2a) - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills Returns: Dictionary with agent metadata """ return await self.adapter.get_agent_info()Get agent information including AdCP extension metadata.
Returns agent card information including: - Agent name, description, version - Protocol type (mcp or a2a) - AdCP version (from extensions.adcp.adcp_version) - Supported protocols (from extensions.adcp.protocols_supported) - Available tools/skills
Returns
Dictionary with agent metadata
async def get_media_buy_artifacts(self, request: GetMediaBuyArtifactsRequest) ‑> TaskResult[Union[GetMediaBuyArtifactsResponse1, GetMediaBuyArtifactsResponse2]]-
Expand source code
async def get_media_buy_artifacts( self, request: GetMediaBuyArtifactsRequest, ) -> TaskResult[GetMediaBuyArtifactsResponse]: """ Get artifacts associated with a media buy. Retrieves content artifacts where ads were delivered for a media buy. Args: request: Request parameters including media_buy_id Returns: TaskResult containing GetMediaBuyArtifactsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_artifacts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_artifacts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_artifacts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyArtifactsResponse)Get artifacts associated with a media buy.
Retrieves content artifacts where ads were delivered for a media buy.
Args
request- Request parameters including media_buy_id
Returns
TaskResult containing GetMediaBuyArtifactsResponse
async def get_media_buy_delivery(self, request: GetMediaBuyDeliveryRequest) ‑> TaskResult[GetMediaBuyDeliveryResponse]-
Expand source code
async def get_media_buy_delivery( self, request: GetMediaBuyDeliveryRequest, ) -> TaskResult[GetMediaBuyDeliveryResponse]: """ Get Media Buy Delivery. Args: request: Request parameters Returns: TaskResult containing GetMediaBuyDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buy_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buy_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuyDeliveryResponse)Get Media Buy Delivery.
Args
request- Request parameters
Returns
TaskResult containing GetMediaBuyDeliveryResponse
async def get_media_buys(self, request: GetMediaBuysRequest) ‑> TaskResult[GetMediaBuysResponse]-
Expand source code
async def get_media_buys( self, request: GetMediaBuysRequest, ) -> TaskResult[GetMediaBuysResponse]: """ Get Media Buys. Args: request: Request parameters Returns: TaskResult containing GetMediaBuysResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buys", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_media_buys(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_media_buys", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetMediaBuysResponse)Get Media Buys.
Args
request- Request parameters
Returns
TaskResult containing GetMediaBuysResponse
async def get_plan_audit_logs(self, request: GetPlanAuditLogsRequest) ‑> TaskResult[GetPlanAuditLogsResponse]-
Expand source code
async def get_plan_audit_logs( self, request: GetPlanAuditLogsRequest, ) -> TaskResult[GetPlanAuditLogsResponse]: """Retrieve governance state and audit logs for one or more plans.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_plan_audit_logs", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_plan_audit_logs(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_plan_audit_logs", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetPlanAuditLogsResponse)Retrieve governance state and audit logs for one or more plans.
async def get_products(self,
request: GetProductsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url',
creative_agent_client: ADCPClient | None = None) ‑> TaskResult[GetProductsResponse]-
Expand source code
async def get_products( self, request: GetProductsRequest, fetch_previews: bool = False, preview_output_format: str = "url", creative_agent_client: ADCPClient | None = None, ) -> TaskResult[GetProductsResponse]: """ Get advertising products. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) creative_agent_client: Client for creative agent (required if fetch_previews=True) Returns: TaskResult containing GetProductsResponse with optional preview URLs in metadata Raises: ValueError: If fetch_previews=True but creative_agent_client is not provided """ if fetch_previews and not creative_agent_client: raise ValueError("creative_agent_client is required when fetch_previews=True") operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_products(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_products", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[GetProductsResponse] = self.adapter._parse_response( raw_result, GetProductsResponse ) if fetch_previews and result.success and result.data and creative_agent_client: from adcp.utils.preview_cache import add_preview_urls_to_products products_with_previews = await add_preview_urls_to_products( result.data.products, creative_agent_client, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["products_with_previews"] = products_with_previews return resultGet advertising products.
Args
request- Request parameters
fetch_previews- If True, generate preview URLs for each product's formats (uses batch API for 5-10x performance improvement)
preview_output_format- "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)
creative_agent_client- Client for creative agent (required if fetch_previews=True)
Returns
TaskResult containing GetProductsResponse with optional preview URLs in metadata
Raises
ValueError- If fetch_previews=True but creative_agent_client is not provided
async def get_property_list(self, request: GetPropertyListRequest) ‑> TaskResult[GetPropertyListResponse]-
Expand source code
async def get_property_list( self, request: GetPropertyListRequest, ) -> TaskResult[GetPropertyListResponse]: """ Get a property list with optional resolution. When resolve=true, returns the list of resolved property identifiers. Use this to get the actual properties that match the list's filters. Args: request: Request parameters including list_id and resolve flag Returns: TaskResult containing GetPropertyListResponse with identifiers """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetPropertyListResponse)Get a property list with optional resolution.
When resolve=true, returns the list of resolved property identifiers. Use this to get the actual properties that match the list's filters.
Args
request- Request parameters including list_id and resolve flag
Returns
TaskResult containing GetPropertyListResponse with identifiers
async def get_rights(self, request: GetRightsRequest) ‑> TaskResult[Union[GetRightsResponse1, GetRightsResponse2]]-
Expand source code
async def get_rights( self, request: GetRightsRequest, ) -> TaskResult[GetRightsResponse]: """Get available rights for licensing. Searches for rights offerings using natural language query and filters by type, uses, countries, and buyer compatibility. Args: request: Request with query, uses, and optional filters. Returns: TaskResult containing GetRightsResponse with matched rights. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_rights", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_rights(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_rights", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetRightsResponse)Get available rights for licensing.
Searches for rights offerings using natural language query and filters by type, uses, countries, and buyer compatibility.
Args
request- Request with query, uses, and optional filters.
Returns
TaskResult containing GetRightsResponse with matched rights.
async def get_signals(self, request: GetSignalsRequest) ‑> TaskResult[GetSignalsResponse]-
Expand source code
async def get_signals( self, request: GetSignalsRequest, ) -> TaskResult[GetSignalsResponse]: """ Get Signals. Args: request: Request parameters Returns: TaskResult containing GetSignalsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.get_signals(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="get_signals", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, GetSignalsResponse)Get Signals.
Args
request- Request parameters
Returns
TaskResult containing GetSignalsResponse
def get_webhook_url(self, task_type: str, operation_id: str) ‑> str-
Expand source code
def get_webhook_url(self, task_type: str, operation_id: str) -> str: """Generate webhook URL for a task.""" if not self.webhook_url_template: raise ValueError("webhook_url_template not configured") return self.webhook_url_template.format( agent_id=self.agent_config.id, task_type=task_type, operation_id=operation_id, )Generate webhook URL for a task.
async def handle_webhook(self,
payload: dict[str, Any] | Task | TaskStatusUpdateEvent,
task_type: str,
operation_id: str,
signature: str | None = None,
timestamp: str | None = None,
raw_body: bytes | str | None = None) ‑> TaskResult[AdcpAsyncResponseData]-
Expand source code
async def handle_webhook( self, payload: dict[str, Any] | Task | TaskStatusUpdateEvent, task_type: str, operation_id: str, signature: str | None = None, timestamp: str | None = None, raw_body: bytes | str | None = None, ) -> TaskResult[AdcpAsyncResponseData]: """ Handle incoming webhook and return typed result. This method provides a unified interface for handling webhooks from both MCP and A2A protocols: - MCP Webhooks: HTTP POST with dict payload, optional HMAC signature - A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status The method automatically detects the protocol type and routes to the appropriate handler. Both protocols return a consistent TaskResult structure with typed AdCP response data. Args: payload: Webhook payload - one of: - dict[str, Any]: MCP webhook payload from HTTP POST - Task: A2A webhook for terminated statuses (completed, failed) - TaskStatusUpdateEvent: A2A webhook for intermediate statuses (working, input-required, submitted) task_type: Task type from application routing (e.g., "get_products"). Applications should extract this from URL routing pattern: /webhook/{task_type}/{agent_id}/{operation_id} operation_id: Operation identifier from application routing. Used to correlate webhook notifications with original task submission. signature: Optional HMAC-SHA256 signature for MCP webhook verification (X-AdCP-Signature header). Ignored for A2A webhooks. timestamp: Optional Unix timestamp (seconds) for MCP webhook signature verification (X-AdCP-Timestamp header). Required when signature is provided. raw_body: Optional raw HTTP request body bytes for signature verification. When provided, used directly instead of re-serializing the payload, avoiding cross-language JSON serialization mismatches. Strongly recommended for production use. Returns: TaskResult with parsed task-specific response data. The structure is identical regardless of protocol. Raises: ADCPWebhookSignatureError: If MCP signature verification fails ValidationError: If MCP payload doesn't match WebhookPayload schema Note: task_type and operation_id were deprecated from the webhook payload per AdCP specification. Applications must extract these from URL routing and pass them explicitly. Examples: MCP webhook (HTTP endpoint): >>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}") >>> async def webhook_handler(task_type: str, operation_id: str, request: Request): >>> raw_body = await request.body() >>> payload = json.loads(raw_body) >>> signature = request.headers.get("X-AdCP-Signature") >>> timestamp = request.headers.get("X-AdCP-Timestamp") >>> result = await client.handle_webhook( >>> payload, task_type, operation_id, signature, timestamp, >>> raw_body=raw_body, >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}") A2A webhook with Task (terminated status): >>> async def on_task_completed(task: Task): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(task.id) >>> operation_id = your_task_registry.get_operation_id(task.id) >>> result = await client.handle_webhook( >>> task, task_type, operation_id >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}") A2A webhook with TaskStatusUpdateEvent (intermediate status): >>> async def on_task_update(event: TaskStatusUpdateEvent): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(event.task_id) >>> operation_id = your_task_registry.get_operation_id(event.task_id) >>> result = await client.handle_webhook( >>> event, task_type, operation_id >>> ) >>> if result.status == GeneratedTaskStatus.working: >>> print(f"Task still working: {result.metadata.get('message')}") """ # Detect protocol type and route to appropriate handler if isinstance(payload, (Task, TaskStatusUpdateEvent)): # A2A webhook (Task or TaskStatusUpdateEvent) return await self._handle_a2a_webhook(payload, task_type, operation_id) else: # MCP webhook (dict payload) return await self._handle_mcp_webhook( payload, task_type, operation_id, signature, timestamp, raw_body )Handle incoming webhook and return typed result.
This method provides a unified interface for handling webhooks from both MCP and A2A protocols:
- MCP Webhooks: HTTP POST with dict payload, optional HMAC signature
- A2A Webhooks: Task or TaskStatusUpdateEvent objects based on status
The method automatically detects the protocol type and routes to the appropriate handler. Both protocols return a consistent TaskResult structure with typed AdCP response data.
Args
payload- Webhook payload - one of: - dict[str, Any]: MCP webhook payload from HTTP POST - Task: A2A webhook for terminated statuses (completed, failed) - TaskStatusUpdateEvent: A2A webhook for intermediate statuses (working, input-required, submitted)
task_type- Task type from application routing (e.g., "get_products"). Applications should extract this from URL routing pattern: /webhook/{task_type}/{agent_id}/{operation_id}
operation_id- Operation identifier from application routing. Used to correlate webhook notifications with original task submission.
signature- Optional HMAC-SHA256 signature for MCP webhook verification (X-AdCP-Signature header). Ignored for A2A webhooks.
timestamp- Optional Unix timestamp (seconds) for MCP webhook signature verification (X-AdCP-Timestamp header). Required when signature is provided.
raw_body- Optional raw HTTP request body bytes for signature verification. When provided, used directly instead of re-serializing the payload, avoiding cross-language JSON serialization mismatches. Strongly recommended for production use.
Returns
TaskResult with parsed task-specific response data. The structure is identical regardless of protocol.
Raises
ADCPWebhookSignatureError- If MCP signature verification fails
ValidationError- If MCP payload doesn't match WebhookPayload schema
Note
task_type and operation_id were deprecated from the webhook payload per AdCP specification. Applications must extract these from URL routing and pass them explicitly.
Examples
MCP webhook (HTTP endpoint):
>>> @app.post("/webhook/{task_type}/{agent_id}/{operation_id}") >>> async def webhook_handler(task_type: str, operation_id: str, request: Request): >>> raw_body = await request.body() >>> payload = json.loads(raw_body) >>> signature = request.headers.get("X-AdCP-Signature") >>> timestamp = request.headers.get("X-AdCP-Timestamp") >>> result = await client.handle_webhook( >>> payload, task_type, operation_id, signature, timestamp, >>> raw_body=raw_body, >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}")A2A webhook with Task (terminated status):
>>> async def on_task_completed(task: Task): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(task.id) >>> operation_id = your_task_registry.get_operation_id(task.id) >>> result = await client.handle_webhook( >>> task, task_type, operation_id >>> ) >>> if result.success: >>> print(f"Task completed: {result.data}")A2A webhook with TaskStatusUpdateEvent (intermediate status):
>>> async def on_task_update(event: TaskStatusUpdateEvent): >>> # Extract task_type and operation_id from your app's task tracking >>> task_type = your_task_registry.get_type(event.task_id) >>> operation_id = your_task_registry.get_operation_id(event.task_id) >>> result = await client.handle_webhook( >>> event, task_type, operation_id >>> ) >>> if result.status == GeneratedTaskStatus.working: >>> print(f"Task still working: {result.metadata.get('message')}") async def identity_match(self, request: IdentityMatchRequest) ‑> TaskResult[IdentityMatchResponse]-
Expand source code
async def identity_match( self, request: IdentityMatchRequest, ) -> TaskResult[IdentityMatchResponse]: """Match user identity for package eligibility. Evaluates a user identity token against all active packages for frequency capping and personalization. Args: request: Identity match request with user_token, uid_type, and package_ids. Returns: TaskResult containing IdentityMatchResponse with eligible_package_ids. """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True, by_alias=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="identity_match", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.identity_match(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="identity_match", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, IdentityMatchResponse)Match user identity for package eligibility.
Evaluates a user identity token against all active packages for frequency capping and personalization.
Args
request- Identity match request with user_token, uid_type, and package_ids.
Returns
TaskResult containing IdentityMatchResponse with eligible_package_ids.
async def list_accounts(self, request: ListAccountsRequest) ‑> TaskResult[ListAccountsResponse]-
Expand source code
async def list_accounts( self, request: ListAccountsRequest, ) -> TaskResult[ListAccountsResponse]: """ List Accounts. Args: request: Request parameters Returns: TaskResult containing ListAccountsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_accounts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_accounts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_accounts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListAccountsResponse)List Accounts.
Args
request- Request parameters
Returns
TaskResult containing ListAccountsResponse
async def list_collection_lists(self, request: ListCollectionListsRequest) ‑> TaskResult[ListCollectionListsResponse]-
Expand source code
async def list_collection_lists( self, request: ListCollectionListsRequest, ) -> TaskResult[ListCollectionListsResponse]: """List collection lists owned by a principal. Args: request: Request parameters with optional filtering Returns: TaskResult containing ListCollectionListsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_collection_lists", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_collection_lists(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_collection_lists", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListCollectionListsResponse)List collection lists owned by a principal.
Args
request- Request parameters with optional filtering
Returns
TaskResult containing ListCollectionListsResponse
async def list_content_standards(self, request: ListContentStandardsRequest) ‑> TaskResult[Union[ListContentStandardsResponse1, ListContentStandardsResponse2]]-
Expand source code
async def list_content_standards( self, request: ListContentStandardsRequest, ) -> TaskResult[ListContentStandardsResponse]: """ List content standards configurations. Args: request: Request parameters including optional filters Returns: TaskResult containing ListContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListContentStandardsResponse)List content standards configurations.
Args
request- Request parameters including optional filters
Returns
TaskResult containing ListContentStandardsResponse
async def list_creative_formats(self,
request: ListCreativeFormatsRequest,
fetch_previews: bool = False,
preview_output_format: str = 'url') ‑> TaskResult[ListCreativeFormatsResponse]-
Expand source code
async def list_creative_formats( self, request: ListCreativeFormatsRequest, fetch_previews: bool = False, preview_output_format: str = "url", ) -> TaskResult[ListCreativeFormatsResponse]: """ List supported creative formats. Args: request: Request parameters fetch_previews: If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement) preview_output_format: "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead) Returns: TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creative_formats(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creative_formats", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) result: TaskResult[ListCreativeFormatsResponse] = self.adapter._parse_response( raw_result, ListCreativeFormatsResponse ) if fetch_previews and result.success and result.data: from adcp.utils.preview_cache import add_preview_urls_to_formats formats_with_previews = await add_preview_urls_to_formats( result.data.formats, self, use_batch=True, output_format=preview_output_format, ) result.metadata = result.metadata or {} result.metadata["formats_with_previews"] = formats_with_previews return resultList supported creative formats.
Args
request- Request parameters
fetch_previews- If True, generate preview URLs for each format using sample manifests (uses batch API for 5-10x performance improvement)
preview_output_format- "url" for iframe URLs (default), "html" for direct embedding (2-3x faster, no iframe overhead)
Returns
TaskResult containing ListCreativeFormatsResponse with optional preview URLs in metadata
async def list_creatives(self, request: ListCreativesRequest) ‑> TaskResult[ListCreativesResponse]-
Expand source code
async def list_creatives( self, request: ListCreativesRequest, ) -> TaskResult[ListCreativesResponse]: """ List Creatives. Args: request: Request parameters Returns: TaskResult containing ListCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListCreativesResponse)List Creatives.
Args
request- Request parameters
Returns
TaskResult containing ListCreativesResponse
async def list_property_lists(self, request: ListPropertyListsRequest) ‑> TaskResult[ListPropertyListsResponse]-
Expand source code
async def list_property_lists( self, request: ListPropertyListsRequest, ) -> TaskResult[ListPropertyListsResponse]: """ List property lists owned by a principal. Retrieves metadata for all property lists, optionally filtered by principal or pagination parameters. Args: request: Request parameters with optional filtering Returns: TaskResult containing ListPropertyListsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_property_lists", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.list_property_lists(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="list_property_lists", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ListPropertyListsResponse)List property lists owned by a principal.
Retrieves metadata for all property lists, optionally filtered by principal or pagination parameters.
Args
request- Request parameters with optional filtering
Returns
TaskResult containing ListPropertyListsResponse
async def list_tools(self) ‑> list[str]-
Expand source code
async def list_tools(self) -> list[str]: """ List available tools from the agent. Returns: List of tool names """ return await self.adapter.list_tools()List available tools from the agent.
Returns
List of tool names
async def log_event(self, request: LogEventRequest) ‑> TaskResult[Union[LogEventResponse1, LogEventResponse2]]-
Expand source code
async def log_event( self, request: LogEventRequest, ) -> TaskResult[LogEventResponse]: """ Log Event. Args: request: Request parameters Returns: TaskResult containing LogEventResponse """ self._validate_task_features("log_event") operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="log_event", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.log_event(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="log_event", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, LogEventResponse)Log Event.
Args
request- Request parameters
Returns
TaskResult containing LogEventResponse
async def preview_creative(self, request: PreviewCreativeRequest) ‑> TaskResult[Union[PreviewCreativeResponse1, PreviewCreativeResponse2, PreviewCreativeResponse3]]-
Expand source code
async def preview_creative( self, request: PreviewCreativeRequest, ) -> TaskResult[PreviewCreativeResponse]: """ Generate preview of a creative manifest. Args: request: Request parameters Returns: TaskResult containing PreviewCreativeResponse with preview URLs """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.preview_creative(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="preview_creative", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, PreviewCreativeResponse)Generate preview of a creative manifest.
Args
request- Request parameters
Returns
TaskResult containing PreviewCreativeResponse with preview URLs
async def provide_performance_feedback(self, request: ProvidePerformanceFeedbackRequest) ‑> TaskResult[Union[ProvidePerformanceFeedbackResponse1, ProvidePerformanceFeedbackResponse2]]-
Expand source code
async def provide_performance_feedback( self, request: ProvidePerformanceFeedbackRequest, ) -> TaskResult[ProvidePerformanceFeedbackResponse]: """ Provide Performance Feedback. Args: request: Request parameters Returns: TaskResult containing ProvidePerformanceFeedbackResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.provide_performance_feedback(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="provide_performance_feedback", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ProvidePerformanceFeedbackResponse)Provide Performance Feedback.
Args
request- Request parameters
Returns
TaskResult containing ProvidePerformanceFeedbackResponse
async def refresh_capabilities(self) ‑> GetAdcpCapabilitiesResponse-
Expand source code
async def refresh_capabilities(self) -> GetAdcpCapabilitiesResponse: """Fetch capabilities from the seller, bypassing cache. Returns: The seller's capabilities response. """ result = await self.get_adcp_capabilities(GetAdcpCapabilitiesRequest()) if result.success and result.data is not None: self._capabilities = result.data self._feature_resolver = FeatureResolver(result.data) self._capabilities_fetched_at = time.monotonic() return self._capabilities raise ADCPError( f"Failed to fetch capabilities: {result.error or result.message}", agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, )Fetch capabilities from the seller, bypassing cache.
Returns
The seller's capabilities response.
async def report_plan_outcome(self, request: ReportPlanOutcomeRequest) ‑> TaskResult[ReportPlanOutcomeResponse]-
Expand source code
async def report_plan_outcome( self, request: ReportPlanOutcomeRequest, ) -> TaskResult[ReportPlanOutcomeResponse]: """Report the outcome of a governed action to the governance agent.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_plan_outcome", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.report_plan_outcome(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_plan_outcome", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ReportPlanOutcomeResponse)Report the outcome of a governed action to the governance agent.
async def report_usage(self, request: ReportUsageRequest) ‑> TaskResult[ReportUsageResponse]-
Expand source code
async def report_usage( self, request: ReportUsageRequest, ) -> TaskResult[ReportUsageResponse]: """ Report Usage. Args: request: Request parameters Returns: TaskResult containing ReportUsageResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_usage", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.report_usage(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="report_usage", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ReportUsageResponse)Report Usage.
Args
request- Request parameters
Returns
TaskResult containing ReportUsageResponse
def require(self, *features: str) ‑> None-
Expand source code
def require(self, *features: str) -> None: """Assert that the seller supports all listed features. Args: *features: Feature identifiers to require. Raises: ADCPFeatureUnsupportedError: If any features are not supported. ADCPError: If capabilities have not been fetched yet. """ self._ensure_resolver().require( *features, agent_id=self.agent_config.id, agent_uri=self.agent_config.agent_uri, )Assert that the seller supports all listed features.
Args
*features- Feature identifiers to require.
Raises
ADCPFeatureUnsupportedError- If any features are not supported.
ADCPError- If capabilities have not been fetched yet.
async def si_get_offering(self, request: SiGetOfferingRequest) ‑> TaskResult[SiGetOfferingResponse]-
Expand source code
async def si_get_offering( self, request: SiGetOfferingRequest, ) -> TaskResult[SiGetOfferingResponse]: """ Get sponsored intelligence offering. Retrieves product/service offerings that can be presented in a sponsored intelligence session. Args: request: Request parameters including brand context Returns: TaskResult containing SiGetOfferingResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_get_offering", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_get_offering(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_get_offering", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiGetOfferingResponse)Get sponsored intelligence offering.
Retrieves product/service offerings that can be presented in a sponsored intelligence session.
Args
request- Request parameters including brand context
Returns
TaskResult containing SiGetOfferingResponse
async def si_initiate_session(self, request: SiInitiateSessionRequest) ‑> TaskResult[SiInitiateSessionResponse]-
Expand source code
async def si_initiate_session( self, request: SiInitiateSessionRequest, ) -> TaskResult[SiInitiateSessionResponse]: """ Initiate a sponsored intelligence session. Starts a conversational brand experience session with a user. Args: request: Request parameters including identity and context Returns: TaskResult containing SiInitiateSessionResponse with session_id """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_initiate_session", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_initiate_session(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_initiate_session", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiInitiateSessionResponse)Initiate a sponsored intelligence session.
Starts a conversational brand experience session with a user.
Args
request- Request parameters including identity and context
Returns
TaskResult containing SiInitiateSessionResponse with session_id
async def si_send_message(self, request: SiSendMessageRequest) ‑> TaskResult[SiSendMessageResponse]-
Expand source code
async def si_send_message( self, request: SiSendMessageRequest, ) -> TaskResult[SiSendMessageResponse]: """ Send a message in a sponsored intelligence session. Continues the conversation in an active SI session. Args: request: Request parameters including session_id and message Returns: TaskResult containing SiSendMessageResponse with brand response """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_send_message", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_send_message(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_send_message", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiSendMessageResponse)Send a message in a sponsored intelligence session.
Continues the conversation in an active SI session.
Args
request- Request parameters including session_id and message
Returns
TaskResult containing SiSendMessageResponse with brand response
async def si_terminate_session(self, request: SiTerminateSessionRequest) ‑> TaskResult[SiTerminateSessionResponse]-
Expand source code
async def si_terminate_session( self, request: SiTerminateSessionRequest, ) -> TaskResult[SiTerminateSessionResponse]: """ Terminate a sponsored intelligence session. Ends an active SI session, optionally with follow-up actions. Args: request: Request parameters including session_id and termination context Returns: TaskResult containing SiTerminateSessionResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_terminate_session", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.si_terminate_session(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="si_terminate_session", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SiTerminateSessionResponse)Terminate a sponsored intelligence session.
Ends an active SI session, optionally with follow-up actions.
Args
request- Request parameters including session_id and termination context
Returns
TaskResult containing SiTerminateSessionResponse
def supports(self, feature: str) ‑> bool-
Expand source code
def supports(self, feature: str) -> bool: """Check if the seller supports a feature. Supports multiple feature namespaces: - Protocol support: ``supports("media_buy")`` checks ``supported_protocols`` - Extension support: ``supports("ext:scope3")`` checks ``extensions_supported`` - Targeting: ``supports("targeting.geo_countries")`` checks ``media_buy.execution.targeting`` - Media buy features: ``supports("audience_targeting")`` checks ``media_buy.features`` - Signals features: ``supports("catalog_signals")`` checks ``signals.features`` Args: feature: Feature identifier to check. Returns: True if the seller declares the feature as supported. Raises: ADCPError: If capabilities have not been fetched yet. """ return self._ensure_resolver().supports(feature)Check if the seller supports a feature.
Supports multiple feature namespaces: - Protocol support:
supports("media_buy")checkssupported_protocols- Extension support:supports("ext:scope3")checksextensions_supported- Targeting:supports("targeting.geo_countries")checksmedia_buy.execution.targeting- Media buy features:supports("audience_targeting")checksmedia_buy.features- Signals features:supports("catalog_signals")checkssignals.featuresArgs
feature- Feature identifier to check.
Returns
True if the seller declares the feature as supported.
Raises
ADCPError- If capabilities have not been fetched yet.
async def sync_accounts(self, request: SyncAccountsRequest) ‑> TaskResult[Union[SyncAccountsResponse1, SyncAccountsResponse2]]-
Expand source code
async def sync_accounts( self, request: SyncAccountsRequest, ) -> TaskResult[SyncAccountsResponse]: """ Sync Accounts. Args: request: Request parameters Returns: TaskResult containing SyncAccountsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_accounts", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_accounts(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_accounts", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncAccountsResponse)Sync Accounts.
Args
request- Request parameters
Returns
TaskResult containing SyncAccountsResponse
async def sync_audiences(self, request: SyncAudiencesRequest) ‑> TaskResult[Union[SyncAudiencesResponse1, SyncAudiencesResponse2]]-
Expand source code
async def sync_audiences( self, request: SyncAudiencesRequest, ) -> TaskResult[SyncAudiencesResponse]: """ Sync Audiences. Args: request: Request parameters Returns: TaskResult containing SyncAudiencesResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_audiences", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_audiences(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_audiences", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncAudiencesResponse)Sync Audiences.
Args
request- Request parameters
Returns
TaskResult containing SyncAudiencesResponse
async def sync_catalogs(self, request: SyncCatalogsRequest) ‑> TaskResult[Union[SyncCatalogsResponse1, SyncCatalogsResponse2]]-
Expand source code
async def sync_catalogs( self, request: SyncCatalogsRequest, ) -> TaskResult[SyncCatalogsResponse]: """ Sync Catalogs. Args: request: Request parameters Returns: TaskResult containing SyncCatalogsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_catalogs", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_catalogs(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_catalogs", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCatalogsResponse)Sync Catalogs.
Args
request- Request parameters
Returns
TaskResult containing SyncCatalogsResponse
async def sync_creatives(self, request: SyncCreativesRequest) ‑> TaskResult[Union[SyncCreativesResponse1, SyncCreativesResponse2, SyncCreativesResponse3]]-
Expand source code
async def sync_creatives( self, request: SyncCreativesRequest, ) -> TaskResult[SyncCreativesResponse]: """ Sync Creatives. Args: request: Request parameters Returns: TaskResult containing SyncCreativesResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_creatives(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_creatives", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncCreativesResponse)Sync Creatives.
Args
request- Request parameters
Returns
TaskResult containing SyncCreativesResponse
async def sync_event_sources(self, request: SyncEventSourcesRequest) ‑> TaskResult[Union[SyncEventSourcesResponse1, SyncEventSourcesResponse2]]-
Expand source code
async def sync_event_sources( self, request: SyncEventSourcesRequest, ) -> TaskResult[SyncEventSourcesResponse]: """ Sync Event Sources. Args: request: Request parameters Returns: TaskResult containing SyncEventSourcesResponse """ self._validate_task_features("sync_event_sources") operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_event_sources", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_event_sources(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_event_sources", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncEventSourcesResponse)Sync Event Sources.
Args
request- Request parameters
Returns
TaskResult containing SyncEventSourcesResponse
async def sync_governance(self, request: SyncGovernanceRequest) ‑> TaskResult[Union[SyncGovernanceResponse1, SyncGovernanceResponse2]]-
Expand source code
async def sync_governance( self, request: SyncGovernanceRequest, ) -> TaskResult[SyncGovernanceResponse]: """Sync governance agents attached to an account. Attach, detach, or replace the set of governance agents that must be consulted for plan approval on an account. Args: request: Request parameters with account and governance agents Returns: TaskResult containing SyncGovernanceResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_governance", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_governance(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_governance", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncGovernanceResponse)Sync governance agents attached to an account.
Attach, detach, or replace the set of governance agents that must be consulted for plan approval on an account.
Args
request- Request parameters with account and governance agents
Returns
TaskResult containing SyncGovernanceResponse
async def sync_plans(self, request: SyncPlansRequest) ‑> TaskResult[SyncPlansResponse]-
Expand source code
async def sync_plans( self, request: SyncPlansRequest, ) -> TaskResult[SyncPlansResponse]: """Sync campaign governance plans to the governance agent.""" operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_plans", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.sync_plans(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="sync_plans", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, SyncPlansResponse)Sync campaign governance plans to the governance agent.
async def update_collection_list(self, request: UpdateCollectionListRequest) ‑> TaskResult[UpdateCollectionListResponse]-
Expand source code
async def update_collection_list( self, request: UpdateCollectionListRequest, ) -> TaskResult[UpdateCollectionListResponse]: """Update a collection list. Args: request: Request parameters with list_id and updates Returns: TaskResult containing UpdateCollectionListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_collection_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_collection_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_collection_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateCollectionListResponse)Update a collection list.
Args
request- Request parameters with list_id and updates
Returns
TaskResult containing UpdateCollectionListResponse
async def update_content_standards(self, request: UpdateContentStandardsRequest) ‑> TaskResult[Union[UpdateContentStandardsResponse1, UpdateContentStandardsResponse2]]-
Expand source code
async def update_content_standards( self, request: UpdateContentStandardsRequest, ) -> TaskResult[UpdateContentStandardsResponse]: """ Update a content standards configuration. Args: request: Request parameters including standards_id and updates Returns: TaskResult containing UpdateContentStandardsResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_content_standards", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_content_standards(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_content_standards", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateContentStandardsResponse)Update a content standards configuration.
Args
request- Request parameters including standards_id and updates
Returns
TaskResult containing UpdateContentStandardsResponse
async def update_media_buy(self, request: UpdateMediaBuyRequest) ‑> TaskResult[Union[UpdateMediaBuyResponse1, UpdateMediaBuyResponse2]]-
Expand source code
async def update_media_buy( self, request: UpdateMediaBuyRequest, ) -> TaskResult[UpdateMediaBuyResponse]: """ Update an existing media buy reservation. Modifies a previously created media buy by updating packages or publisher properties. The update operation uses discriminated unions to specify what to change - either package details or targeting properties. Args: request: Media buy update parameters including: - media_buy_id: Identifier from create_media_buy response - updates: Discriminated union specifying update type: * UpdateMediaBuyPackagesRequest: Modify package selections * UpdateMediaBuyPropertiesRequest: Change targeting properties Returns: TaskResult containing UpdateMediaBuyResponse with: - media_buy_id: The updated media buy identifier - status: Updated state of the media buy - packages: Updated package configurations - Additional platform-specific metadata Example: >>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest >>> client = ADCPClient(agent_config) >>> request = UpdateMediaBuyPackagesRequest( ... media_buy_id="mb_123", ... packages=[updated_package] ... ) >>> result = await client.update_media_buy(request) >>> if result.success: ... updated_packages = result.data.packages """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_media_buy", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_media_buy(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_media_buy", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateMediaBuyResponse)Update an existing media buy reservation.
Modifies a previously created media buy by updating packages or publisher properties. The update operation uses discriminated unions to specify what to change - either package details or targeting properties.
Args
request- Media buy update parameters including: - media_buy_id: Identifier from create_media_buy response - updates: Discriminated union specifying update type: * UpdateMediaBuyPackagesRequest: Modify package selections * UpdateMediaBuyPropertiesRequest: Change targeting properties
Returns
TaskResult containing UpdateMediaBuyResponse with: - media_buy_id: The updated media buy identifier - status: Updated state of the media buy - packages: Updated package configurations - Additional platform-specific metadata
Example
>>> from adcp import ADCPClient, UpdateMediaBuyPackagesRequest >>> client = ADCPClient(agent_config) >>> request = UpdateMediaBuyPackagesRequest( ... media_buy_id="mb_123", ... packages=[updated_package] ... ) >>> result = await client.update_media_buy(request) >>> if result.success: ... updated_packages = result.data.packages async def update_property_list(self, request: UpdatePropertyListRequest) ‑> TaskResult[UpdatePropertyListResponse]-
Expand source code
async def update_property_list( self, request: UpdatePropertyListRequest, ) -> TaskResult[UpdatePropertyListResponse]: """ Update a property list. Modifies the filters, brand manifest, or other parameters of an existing property list. Args: request: Request parameters with list_id and updates Returns: TaskResult containing UpdatePropertyListResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_property_list", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_property_list(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_property_list", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdatePropertyListResponse)Update a property list.
Modifies the filters, brand manifest, or other parameters of an existing property list.
Args
request- Request parameters with list_id and updates
Returns
TaskResult containing UpdatePropertyListResponse
async def update_rights(self, request: UpdateRightsRequest) ‑> TaskResult[Union[UpdateRightsResponse1, UpdateRightsResponse2]]-
Expand source code
async def update_rights( self, request: UpdateRightsRequest, ) -> TaskResult[UpdateRightsResponse]: """Update terms of an existing rights acquisition. Modifies a previously acquired rights record — typically to extend the ``end_date``, raise the ``impression_cap``, pause/unpause via ``paused``, or swap to a compatible ``pricing_option_id``. Partial update: pass only the fields you want to change. Failure modes (surface as ``TaskResult`` with ``success=False``): * Acquisition is expired or revoked — the seller rejects the update outright; mint a fresh ``acquire_rights`` instead. * ``pricing_option_id`` swap to an incompatible option — rejected; the new option's terms must be a strict superset / compatible with the original acquisition. * No partial-state mutations on rejection: the acquisition remains at its prior state when any field fails validation. Args: request: Request with ``rights_id`` and at least one mutable field (``end_date``, ``impression_cap``, ``paused``, or ``pricing_option_id``). Returns: TaskResult containing UpdateRightsResponse (updated or error). """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_rights", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.update_rights(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="update_rights", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, UpdateRightsResponse)Update terms of an existing rights acquisition.
Modifies a previously acquired rights record — typically to extend the
end_date, raise theimpression_cap, pause/unpause viapaused, or swap to a compatiblepricing_option_id. Partial update: pass only the fields you want to change.Failure modes (surface as
TaskResultwithsuccess=False):- Acquisition is expired or revoked — the seller rejects the update
outright; mint a fresh
acquire_rightsinstead. pricing_option_idswap to an incompatible option — rejected; the new option's terms must be a strict superset / compatible with the original acquisition.- No partial-state mutations on rejection: the acquisition remains at its prior state when any field fails validation.
Args
request- Request with
rights_idand at least one mutable field (end_date,impression_cap,paused, orpricing_option_id).
Returns
TaskResult containing UpdateRightsResponse (updated or error).
- Acquisition is expired or revoked — the seller rejects the update
outright; mint a fresh
def use_idempotency_key(self, key: str) ‑> Iterator[str]-
Expand source code
@contextlib.contextmanager def use_idempotency_key(self, key: str) -> Iterator[str]: """Pin an ``idempotency_key`` for the next mutating call on THIS client. Use when you've persisted a key (e.g., in a buyer-side database) and want the SDK to send that exact key on resume or retry across process restarts. The key is validated against ``^[A-Za-z0-9_.:-]{16,255}$`` on entry; a ``ValueError`` is raised for malformed keys. Scope rules: * **Single-use within scope.** The first mutating call inside the ``with`` block consumes the pinned key; a second mutating call falls through to a fresh UUID. This protects against ``asyncio.gather`` siblings accidentally sharing the key (which would trigger ``IDEMPOTENCY_CONFLICT`` or silently duplicate work). If you need to retry, wrap each attempt in its own ``with`` block. * **Client-scoped.** The pinned key applies only to calls on THIS client. A mutating call on a sibling ``ADCPClient`` inside the same ``with`` block generates a fresh key and emits a ``UserWarning`` — keys must be unique per (seller, request) pair (AdCP #2315). * **No nesting.** Nested ``use_idempotency_key`` on the same client raises ``RuntimeError``. Example:: with client.use_idempotency_key(campaign.stored_key): result = await client.create_media_buy(request) """ from adcp import _idempotency _idempotency.validate_key(key) token = self._idempotency_client_token if token in _idempotency._scoped_keys: raise RuntimeError( "use_idempotency_key is already active on this client; " "nested usage is not supported." ) _idempotency._scoped_keys[token] = key try: yield key finally: _idempotency._scoped_keys.pop(token, None)Pin an
idempotency_keyfor the next mutating call on THIS client.Use when you've persisted a key (e.g., in a buyer-side database) and want the SDK to send that exact key on resume or retry across process restarts. The key is validated against
^[A-Za-z0-9_.:-]{16,255}$on entry; aValueErroris raised for malformed keys.Scope rules:
- Single-use within scope. The first mutating call inside the
withblock consumes the pinned key; a second mutating call falls through to a fresh UUID. This protects againstasyncio.gathersiblings accidentally sharing the key (which would triggerIDEMPOTENCY_CONFLICTor silently duplicate work). If you need to retry, wrap each attempt in its ownwithblock. - Client-scoped. The pinned key applies only to calls on THIS
client. A mutating call on a sibling
ADCPClientinside the samewithblock generates a fresh key and emits aUserWarning— keys must be unique per (seller, request) pair (AdCP #2315). - No nesting. Nested
use_idempotency_keyon the same client raisesRuntimeError.
Example::
with client.use_idempotency_key(campaign.stored_key): result = await client.create_media_buy(request) - Single-use within scope. The first mutating call inside the
async def validate_content_delivery(self, request: ValidateContentDeliveryRequest) ‑> TaskResult[Union[ValidateContentDeliveryResponse1, ValidateContentDeliveryResponse2]]-
Expand source code
async def validate_content_delivery( self, request: ValidateContentDeliveryRequest, ) -> TaskResult[ValidateContentDeliveryResponse]: """ Validate content delivery against standards. Validates that ad delivery records comply with content standards. Args: request: Request parameters including delivery records Returns: TaskResult containing ValidateContentDeliveryResponse """ operation_id = create_operation_id() params = request.model_dump(mode="json", exclude_none=True) self._emit_activity( Activity( type=ActivityType.PROTOCOL_REQUEST, operation_id=operation_id, agent_id=self.agent_config.id, task_type="validate_content_delivery", timestamp=datetime.now(timezone.utc).isoformat(), ) ) raw_result = await self.adapter.validate_content_delivery(params) self._emit_activity( Activity( type=ActivityType.PROTOCOL_RESPONSE, operation_id=operation_id, agent_id=self.agent_config.id, task_type="validate_content_delivery", status=raw_result.status, timestamp=datetime.now(timezone.utc).isoformat(), ) ) return self.adapter._parse_response(raw_result, ValidateContentDeliveryResponse)Validate content delivery against standards.
Validates that ad delivery records comply with content standards.
Args
request- Request parameters including delivery records
Returns
TaskResult containing ValidateContentDeliveryResponse
class ADCPMultiAgentClient (agents: list[AgentConfig],
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
handlers: dict[str, Callable[..., Any]] | None = None,
signing: SigningConfig | None = None)-
Expand source code
class ADCPMultiAgentClient: """Client for managing multiple AdCP agents.""" def __init__( self, agents: list[AgentConfig], webhook_url_template: str | None = None, webhook_secret: str | None = None, on_activity: Callable[[Activity], None] | None = None, handlers: dict[str, Callable[..., Any]] | None = None, signing: SigningConfig | None = None, ): """ Initialize multi-agent client. Args: agents: List of agent configurations webhook_url_template: Template for webhook URLs webhook_secret: Secret for webhook verification on_activity: Callback for activity events handlers: Task completion handlers signing: Optional RFC 9421 signing config forwarded to every per-agent ADCPClient. The same identity signs traffic to all agents. See ADCPClient.__init__ for details. """ self.agents = { agent.id: ADCPClient( agent, webhook_url_template=webhook_url_template, webhook_secret=webhook_secret, on_activity=on_activity, signing=signing, ) for agent in agents } self.handlers = handlers or {} def agent(self, agent_id: str) -> ADCPClient: """Get client for specific agent.""" if agent_id not in self.agents: raise ValueError(f"Agent not found: {agent_id}") return self.agents[agent_id] @property def agent_ids(self) -> list[str]: """Get list of agent IDs.""" return list(self.agents.keys()) async def close(self) -> None: """Close all agent clients and clean up resources.""" import asyncio logger.debug("Closing all agent clients in multi-agent client") close_tasks = [client.close() for client in self.agents.values()] await asyncio.gather(*close_tasks, return_exceptions=True) async def __aenter__(self) -> ADCPMultiAgentClient: """Async context manager entry.""" return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Async context manager exit.""" await self.close() async def get_products( self, request: GetProductsRequest, ) -> list[TaskResult[GetProductsResponse]]: """ Execute get_products across all agents in parallel. Args: request: Request parameters Returns: List of TaskResults containing GetProductsResponse for each agent """ import asyncio tasks = [agent.get_products(request) for agent in self.agents.values()] return await asyncio.gather(*tasks) @classmethod def from_env(cls) -> ADCPMultiAgentClient: """Create client from environment variables.""" agents_json = os.getenv("ADCP_AGENTS") if not agents_json: raise ValueError("ADCP_AGENTS environment variable not set") agents_data = json.loads(agents_json) agents = [AgentConfig(**agent) for agent in agents_data] return cls( agents=agents, webhook_url_template=os.getenv("WEBHOOK_URL_TEMPLATE"), webhook_secret=os.getenv("WEBHOOK_SECRET"), )Client for managing multiple AdCP agents.
Initialize multi-agent client.
Args
agents- List of agent configurations
webhook_url_template- Template for webhook URLs
webhook_secret- Secret for webhook verification
on_activity- Callback for activity events
handlers- Task completion handlers
signing- Optional RFC 9421 signing config forwarded to every per-agent ADCPClient. The same identity signs traffic to all agents. See ADCPClient.init for details.
Static methods
def from_env() ‑> ADCPMultiAgentClient-
Create client from environment variables.
Instance variables
prop agent_ids : list[str]-
Expand source code
@property def agent_ids(self) -> list[str]: """Get list of agent IDs.""" return list(self.agents.keys())Get list of agent IDs.
Methods
def agent(self, agent_id: str) ‑> ADCPClient-
Expand source code
def agent(self, agent_id: str) -> ADCPClient: """Get client for specific agent.""" if agent_id not in self.agents: raise ValueError(f"Agent not found: {agent_id}") return self.agents[agent_id]Get client for specific agent.
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close all agent clients and clean up resources.""" import asyncio logger.debug("Closing all agent clients in multi-agent client") close_tasks = [client.close() for client in self.agents.values()] await asyncio.gather(*close_tasks, return_exceptions=True)Close all agent clients and clean up resources.
async def get_products(self, request: GetProductsRequest) ‑> list[TaskResult[GetProductsResponse]]-
Expand source code
async def get_products( self, request: GetProductsRequest, ) -> list[TaskResult[GetProductsResponse]]: """ Execute get_products across all agents in parallel. Args: request: Request parameters Returns: List of TaskResults containing GetProductsResponse for each agent """ import asyncio tasks = [agent.get_products(request) for agent in self.agents.values()] return await asyncio.gather(*tasks)Execute get_products across all agents in parallel.
Args
request- Request parameters
Returns
List of TaskResults containing GetProductsResponse for each agent