Module adcp.server
ADCP Server Framework.
Build an AdCP agent in minutes. The framework handles the protocol plumbing so you focus on business logic.
Quickstart (class-based)::
from adcp.server import ADCPHandler, serve
from adcp.server.responses import capabilities_response, products_response
class MySeller(ADCPHandler):
async def get_adcp_capabilities(self, params, context=None):
return capabilities_response(["media_buy"])
async def get_products(self, params, context=None):
return products_response(MY_PRODUCTS)
serve(MySeller(), name="my-seller")
Quickstart (decorator-based)::
from adcp.server import adcp_server, serve
from adcp.server.responses import products_response
server = adcp_server("my-seller")
@server.get_products
async def get_products(params, context=None):
return products_response(MY_PRODUCTS)
serve(server, name="my-seller") # capabilities auto-generated
What the framework does automatically:
- Error responses:
adcp_error("BUDGET_TOO_LOW")auto-populates recovery classification (transient/correctable/terminal) from 20+ standard codes. - State transitions:
media_buy_response(..., status="active")auto-populatesvalid_actionsfrom the status. No manual mapping. - Account resolution:
resolve_account()(params, my_resolver)auto-resolves AccountReference and returns ACCOUNT_NOT_FOUND errors. - Context passthrough:
inject_context()(params, response)echoes the request context field back in the response (ADCP requirement). - Cancellation:
cancel_media_buy_response(id, "buyer")auto-sets canceled_at, status, and valid_actions=[]. - Capabilities: The decorator builder auto-generates
get_adcp_capabilitiesfrom which handlers you register. - Validation: GovernanceHandler and ContentStandardsHandler auto-validate request dicts into Pydantic models before your handler code runs.
Sub-modules
adcp.server.a2a_server-
A2A server support for ADCP handlers …
adcp.server.base-
Base classes for ADCP server implementations …
adcp.server.brand-
Brand rights handler for ADCP server implementations.
adcp.server.builder-
Decorator-based server builder for ADCP …
adcp.server.compliance-
Compliance test controller handler for ADCP server implementations.
adcp.server.content_standards-
Content Standards protocol handler …
adcp.server.governance-
Governance protocol handler …
adcp.server.helpers-
DX helpers for ADCP server builders …
adcp.server.idempotency-
Server-side idempotency middleware for AdCP mutating tool handlers …
adcp.server.mcp_tools-
MCP server integration helpers …
adcp.server.proposal-
Proposal generation helpers …
adcp.server.responses-
Response builder helpers for ADCP servers …
adcp.server.sponsored_intelligence-
Sponsored Intelligence protocol handler …
adcp.server.test_controller-
Built-in comply_test_controller for ADCP servers …
adcp.server.tmp-
TMP (Temporal Matching Protocol) handler for ADCP server implementations.
adcp.server.translate-
Error translation and request normalization for proxy and custom-transport servers …
Functions
def activate_signal_response(deployments: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def activate_signal_response( deployments: list[dict[str, Any]], *, sandbox: bool = True, ) -> dict[str, Any]: """Build an activate_signal success response. Each deployment should include: type, is_live, activation_key. For platform: platform, account. For agent: agent_url. Matches ActivateSignalResponse1 (success) schema. """ return { "deployments": deployments, "sandbox": sandbox, }Build an activate_signal success response.
Each deployment should include: type, is_live, activation_key. For platform: platform, account. For agent: agent_url. Matches ActivateSignalResponse1 (success) schema.
def adcp_error(code: str,
message: str | None = None,
*,
field: str | None = None,
suggestion: str | None = None,
recovery: str | None = None,
retry_after: int | None = None,
details: dict[str, str | int | float | bool | None] | None = None) ‑> dict[str, typing.Any]-
Expand source code
def adcp_error( code: str, message: str | None = None, *, field: str | None = None, suggestion: str | None = None, recovery: str | None = None, retry_after: int | None = None, details: dict[str, str | int | float | bool | None] | None = None, ) -> dict[str, Any]: """Build a structured ADCP error response with auto-recovery. Standard codes get recovery auto-populated from the code table. Custom codes default to "terminal". Args: code: Error code (e.g., "BUDGET_TOO_LOW"). message: Human-readable message. Defaults to standard message. field: Which request field caused the error. suggestion: Actionable fix suggestion. recovery: Override ("transient", "correctable", "terminal"). retry_after: Seconds to wait (for RATE_LIMITED). details: Server-generated debugging data (constraint names, limits, thresholds). Use only server-generated values here. NEVER pass request params or user-supplied strings -- they flow to the caller's LLM context and could enable prompt injection. """ std = STANDARD_ERROR_CODES.get(code, {}) err: dict[str, Any] = { "code": code, "message": message or std.get("message", code), "recovery": recovery or std.get("recovery", "terminal"), } if field is not None: err["field"] = field if suggestion is not None: err["suggestion"] = suggestion if retry_after is not None: err["retry_after"] = retry_after if details is not None: err["details"] = details return {"errors": [err]}Build a structured ADCP error response with auto-recovery.
Standard codes get recovery auto-populated from the code table. Custom codes default to "terminal".
Args
code- Error code (e.g., "BUDGET_TOO_LOW").
message- Human-readable message. Defaults to standard message.
field- Which request field caused the error.
suggestion- Actionable fix suggestion.
recovery- Override ("transient", "correctable", "terminal").
retry_after- Seconds to wait (for RATE_LIMITED).
details- Server-generated debugging data (constraint names, limits, thresholds). Use only server-generated values here. NEVER pass request params or user-supplied strings – they flow to the caller's LLM context and could enable prompt injection.
def adcp_server(name: str, **kwargs: Any) ‑> ADCPServerBuilder-
Expand source code
def adcp_server(name: str, **kwargs: Any) -> ADCPServerBuilder: """Create a decorator-based ADCP server builder. Args: name: Server name. **kwargs: Additional configuration (e.g., version="1.0.0"). Returns: An ADCPServerBuilder instance. """ return ADCPServerBuilder(name, **kwargs)Create a decorator-based ADCP server builder.
Args
name- Server name.
**kwargs- Additional configuration (e.g., version="1.0.0").
Returns
An ADCPServerBuilder instance.
def build_creative_response(creative_manifest: dict[str, Any] | list[dict[str, Any]],
*,
sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def build_creative_response( creative_manifest: dict[str, Any] | list[dict[str, Any]], *, sandbox: bool = True, ) -> dict[str, Any]: """Build a build_creative success response. Accepts either a single manifest dict or a list of manifests. Each manifest should include: format_id, name, assets. Single manifest matches BuildCreativeResponse1. List matches BuildCreativeResponse2 (multi-format). """ if isinstance(creative_manifest, list): return { "creative_manifests": creative_manifest, "sandbox": sandbox, } return { "creative_manifest": creative_manifest, "sandbox": sandbox, }Build a build_creative success response.
Accepts either a single manifest dict or a list of manifests. Each manifest should include: format_id, name, assets.
Single manifest matches BuildCreativeResponse1. List matches BuildCreativeResponse2 (multi-format).
def cancel_media_buy_response(media_buy_id: str,
canceled_by: str,
*,
reason: str | None = None,
canceled_at: str | None = None,
affected_packages: list[Any] | None = None,
revision: int | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def cancel_media_buy_response( media_buy_id: str, canceled_by: str, *, reason: str | None = None, canceled_at: str | None = None, affected_packages: list[Any] | None = None, revision: int | None = None, sandbox: bool = True, ) -> dict[str, Any]: """Build a cancellation response with auto-defaults. Auto-sets canceled_at to now, status to "canceled", valid_actions to []. Requires canceled_by ("buyer" or "seller") - the field developers most commonly forget. """ if canceled_by not in ("buyer", "seller"): raise ValueError(f"canceled_by must be 'buyer' or 'seller', got {canceled_by!r}") resp: dict[str, Any] = { "media_buy_id": media_buy_id, "status": "canceled", "canceled_by": canceled_by, "canceled_at": canceled_at or datetime.now(timezone.utc).isoformat(), "valid_actions": [], "sandbox": sandbox, } if reason is not None: resp["reason"] = reason if affected_packages is not None: resp["affected_packages"] = affected_packages if revision is not None: resp["revision"] = revision return respBuild a cancellation response with auto-defaults.
Auto-sets canceled_at to now, status to "canceled", valid_actions to []. Requires canceled_by ("buyer" or "seller") - the field developers most commonly forget.
def capabilities_response(supported_protocols: list[str],
*,
major_versions: list[int] | None = None,
sandbox: bool = True,
features: dict[str, Any] | None = None,
idempotency: dict[str, Any] | None = None,
compliance_testing: dict[str, Any] | None = None) ‑> dict[str, typing.Any]-
Expand source code
def capabilities_response( supported_protocols: list[str], *, major_versions: list[int] | None = None, sandbox: bool = True, features: dict[str, Any] | None = None, idempotency: dict[str, Any] | None = None, compliance_testing: dict[str, Any] | None = None, ) -> dict[str, Any]: """Build a get_adcp_capabilities response. Args: supported_protocols: e.g. ["media_buy"], ["media_buy", "signals"]. Valid values: media_buy, signals, governance, creative, brand, sponsored_intelligence. ``compliance_testing`` is NOT a protocol — pass it via the ``compliance_testing`` kwarg. major_versions: AdCP major versions. Defaults to [3]. sandbox: Whether this is a sandbox agent. Defaults to True. features: Additional feature flags. idempotency: Optional idempotency declaration, nested under ``adcp.idempotency`` per AdCP #2315. Pass the output of :meth:`adcp.server.idempotency.IdempotencyStore.capability` here to declare the seller's ``replay_ttl_seconds``. compliance_testing: Optional top-level ``compliance_testing`` block to advertise compliance-testing capabilities. When provided, emitted as a sibling of ``adcp`` in the response. Example:: from adcp.server.responses import capabilities_response from adcp.server.idempotency import IdempotencyStore, MemoryBackend store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400) return capabilities_response( ["media_buy"], idempotency=store.capability(), ) """ adcp_info: dict[str, Any] = {"major_versions": major_versions or [3]} if idempotency: adcp_info["idempotency"] = idempotency resp: dict[str, Any] = { "adcp": adcp_info, "supported_protocols": supported_protocols, "sandbox": sandbox, } if features: resp["features"] = features if compliance_testing is not None: resp["compliance_testing"] = compliance_testing return respBuild a get_adcp_capabilities response.
Args
supported_protocols- e.g. ["media_buy"], ["media_buy", "signals"].
Valid values: media_buy, signals, governance, creative, brand,
sponsored_intelligence.
compliance_testingis NOT a protocol — pass it via thecompliance_testingkwarg. major_versions- AdCP major versions. Defaults to [3].
sandbox- Whether this is a sandbox agent. Defaults to True.
features- Additional feature flags.
idempotency- Optional idempotency declaration, nested under
adcp.idempotencyper AdCP #2315. Pass the output of :meth:IdempotencyStore.capability()here to declare the seller'sreplay_ttl_seconds. compliance_testing- Optional top-level
compliance_testingblock to advertise compliance-testing capabilities. When provided, emitted as a sibling ofadcpin the response.
Example::
from adcp.server.responses import capabilities_response from adcp.server.idempotency import IdempotencyStore, MemoryBackend store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400) return capabilities_response( ["media_buy"], idempotency=store.capability(), ) def create_a2a_server(handler: ADCPHandler,
*,
name: str = 'adcp-agent',
port: int | None = None,
description: str | None = None,
version: str = '1.0.0',
test_controller: TestControllerStore | None = None) ‑> Any-
Expand source code
def create_a2a_server( handler: ADCPHandler, *, name: str = "adcp-agent", port: int | None = None, description: str | None = None, version: str = "1.0.0", test_controller: TestControllerStore | None = None, ) -> Any: """Create an A2A Starlette application from an ADCP handler. Args: handler: An ADCPHandler subclass instance. name: Agent name shown in the A2A agent card. port: Port number (used in the agent card URL). description: Agent description for the agent card. version: Agent version string. test_controller: Optional TestControllerStore for storyboard testing. Returns: A Starlette app ready to be run with uvicorn. """ from a2a.server.apps.jsonrpc.starlette_app import A2AStarletteApplication resolved_port = port or int(os.environ.get("PORT", "3001")) executor = ADCPAgentExecutor(handler, test_controller=test_controller) agent_card = _build_agent_card( handler, name=name, port=resolved_port, description=description, version=version, extra_skills=_test_controller_skills() if test_controller else None, ) task_store = InMemoryTaskStore() request_handler = DefaultRequestHandler( agent_executor=executor, task_store=task_store, ) a2a_app = A2AStarletteApplication( agent_card=agent_card, http_handler=request_handler, ) return a2a_app.build()Create an A2A Starlette application from an ADCP handler.
Args
handler- An ADCPHandler subclass instance.
name- Agent name shown in the A2A agent card.
port- Port number (used in the agent card URL).
description- Agent description for the agent card.
version- Agent version string.
test_controller- Optional TestControllerStore for storyboard testing.
Returns
A Starlette app ready to be run with uvicorn.
def create_mcp_server(handler: ADCPHandler,
*,
name: str = 'adcp-agent',
port: int | None = None,
instructions: str | None = None,
include_test_controller: bool = False) ‑> Any-
Expand source code
def create_mcp_server( handler: ADCPHandler, *, name: str = "adcp-agent", port: int | None = None, instructions: str | None = None, include_test_controller: bool = False, ) -> Any: """Create a FastMCP server from an ADCP handler without starting it. Use this when you need to customize the server before running it, or when you need to add extra non-ADCP tools. Args: handler: An ADCPHandler subclass instance. name: Server name. port: Port to listen on. instructions: Optional system instructions. include_test_controller: When False (default), skip registering ``comply_test_controller`` as a handler tool. Sellers who want compliance-testing support should pass ``test_controller=`` to :func:`serve`, which registers a store-backed implementation via :func:`register_test_controller` and sets this flag implicitly. Registering the handler stub unconditionally would advertise a tool the seller didn't opt into. Returns: A configured FastMCP server instance. Call mcp.run() to start. Example: mcp = create_mcp_server(MyAgent(), name="my-agent") mcp.run(transport="streamable-http") """ from mcp.server.fastmcp import FastMCP resolved_port = port or int(os.environ.get("PORT", "3001")) mcp = FastMCP(name, instructions=instructions, port=resolved_port) _register_handler_tools(mcp, handler, include_test_controller=include_test_controller) return mcpCreate a FastMCP server from an ADCP handler without starting it.
Use this when you need to customize the server before running it, or when you need to add extra non-ADCP tools.
Args
handler- An ADCPHandler subclass instance.
name- Server name.
port- Port to listen on.
instructions- Optional system instructions.
include_test_controller- When False (default), skip registering
comply_test_controlleras a handler tool. Sellers who want compliance-testing support should passtest_controller=to :func:serve(), which registers a store-backed implementation via :func:register_test_controller()and sets this flag implicitly. Registering the handler stub unconditionally would advertise a tool the seller didn't opt into.
Returns
A configured FastMCP server instance. Call mcp.run() to start.
Example
mcp = create_mcp_server(MyAgent(), name="my-agent") mcp.run(transport="streamable-http")
def create_mcp_tools(handler: ADCPHandler) ‑> MCPToolSet-
Expand source code
def create_mcp_tools(handler: ADCPHandler) -> MCPToolSet: """Create MCP tools from an ADCP handler. This is the main entry point for MCP server integration. Example with mcp library: from mcp.server import Server from adcp.server import ContentStandardsHandler, create_mcp_tools class MyHandler(ContentStandardsHandler): # ... implement methods handler = MyHandler() tools = create_mcp_tools(handler) server = Server("my-content-agent") @server.list_tools() async def list_tools(): return tools.tool_definitions @server.call_tool() async def call_tool(name: str, arguments: dict): return await tools.call_tool(name, arguments) Args: handler: ADCP handler instance Returns: MCPToolSet with tool definitions and handlers """ return MCPToolSet(handler)Create MCP tools from an ADCP handler.
This is the main entry point for MCP server integration.
Example with mcp library: from mcp.server import Server from adcp.server import ContentStandardsHandler, create_mcp_tools
class MyHandler(ContentStandardsHandler): # ... implement methods handler = MyHandler() tools = create_mcp_tools(handler) server = Server("my-content-agent") @server.list_tools() async def list_tools(): return tools.tool_definitions @server.call_tool() async def call_tool(name: str, arguments: dict): return await tools.call_tool(name, arguments)Args
handler- ADCP handler instance
Returns
MCPToolSet with tool definitions and handlers
def creative_formats_response(formats: list[Any], *, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def creative_formats_response( formats: list[Any], *, sandbox: bool = True, ) -> dict[str, Any]: """Build a list_creative_formats response. Each format should include: format_id ({agent_url, id}), name. Matches ListCreativeFormatsResponse schema. """ return { "formats": _serialize(formats), "sandbox": sandbox, }Build a list_creative_formats response.
Each format should include: format_id ({agent_url, id}), name. Matches ListCreativeFormatsResponse schema.
def delivery_response(media_buy_deliveries: list[dict[str, Any]],
*,
reporting_period: dict[str, str] | None = None,
currency: str = 'USD',
sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def delivery_response( media_buy_deliveries: list[dict[str, Any]], *, reporting_period: dict[str, str] | None = None, currency: str = "USD", sandbox: bool = True, ) -> dict[str, Any]: """Build a get_media_buy_delivery response. Each media_buy_delivery should include: media_buy_id, status, totals (impressions, spend, etc.), by_package. Matches GetMediaBuyDeliveryResponse schema. Args: media_buy_deliveries: Array of delivery data per media buy. reporting_period: {"start": ISO timestamp, "end": ISO timestamp}. Defaults to current timestamp for both. currency: ISO 4217 currency code. sandbox: Whether this is simulated data. """ now = datetime.now(timezone.utc).isoformat() return { "reporting_period": reporting_period or {"start": now, "end": now}, "media_buy_deliveries": media_buy_deliveries, "currency": currency, "sandbox": sandbox, }Build a get_media_buy_delivery response.
Each media_buy_delivery should include: media_buy_id, status, totals (impressions, spend, etc.), by_package.
Matches GetMediaBuyDeliveryResponse schema.
Args
media_buy_deliveries- Array of delivery data per media buy.
reporting_period- {"start": ISO timestamp, "end": ISO timestamp}. Defaults to current timestamp for both.
currency- ISO 4217 currency code.
sandbox- Whether this is simulated data.
def error_response(code: str, message: str) ‑> dict[str, typing.Any]-
Expand source code
def error_response(code: str, message: str) -> dict[str, Any]: """Build a single AdCP error object (not a full error response). .. deprecated:: Use ``adcp_error()`` from ``adcp.server.helpers`` instead. It returns a properly wrapped ``{"errors": [...]}`` response with auto-recovery classification. This function returns an unwrapped single error dict ``{"code": ..., "message": ...}`` which is not a valid ADCP error response on its own. """ return {"code": code, "message": message}Build a single AdCP error object (not a full error response).
Deprecated
Use
adcp_error()fromadcp.server.helpersinstead. It returns a properly wrapped{"errors": [...]}response with auto-recovery classification. This function returns an unwrapped single error dict{"code": ..., "message": ...}which is not a valid ADCP error response on its own. def get_tools_for_handler(handler: ADCPHandler | type[ADCPHandler]) ‑> list[dict[str, typing.Any]]-
Expand source code
def get_tools_for_handler(handler: ADCPHandler | type[ADCPHandler]) -> list[dict[str, Any]]: """Return tool definitions filtered by handler type. Walks the MRO to find the matching handler base class, so subclasses (e.g. MyGovernanceAgent(GovernanceHandler)) get the correct tool set. ADCPHandler gets all tools. Unknown handlers get only protocol discovery (minimum privilege). Args: handler: The handler instance or class Returns: Filtered list of tool definitions """ cls = handler if isinstance(handler, type) else type(handler) for base in cls.__mro__: if base.__name__ in _HANDLER_TOOLS: allowed = _HANDLER_TOOLS[base.__name__] | _PROTOCOL_TOOLS return [tool for tool in ADCP_TOOL_DEFINITIONS if tool["name"] in allowed] return [tool for tool in ADCP_TOOL_DEFINITIONS if tool["name"] in _PROTOCOL_TOOLS]Return tool definitions filtered by handler type.
Walks the MRO to find the matching handler base class, so subclasses (e.g. MyGovernanceAgent(GovernanceHandler)) get the correct tool set. ADCPHandler gets all tools. Unknown handlers get only protocol discovery (minimum privilege).
Args
handler- The handler instance or class
Returns
Filtered list of tool definitions
def inject_context(params: dict[str, Any], response: dict[str, Any], *, max_size: int = 65536) ‑> dict[str, typing.Any]-
Expand source code
def inject_context( params: dict[str, Any], response: dict[str, Any], *, max_size: int = _MAX_CONTEXT_SIZE, ) -> dict[str, Any]: """Auto-inject context passthrough from request into response. ADCP requires that if a request contains a ``context`` field, the response must echo it back unchanged. A size limit prevents resource amplification from oversized context payloads. The context field is opaque and may contain attacker-controlled data -- do not interpret or display its contents. """ if "context" in params and "context" not in response: import json ctx = params["context"] if len(json.dumps(ctx, default=str)) <= max_size: response["context"] = ctx return responseAuto-inject context passthrough from request into response.
ADCP requires that if a request contains a
contextfield, the response must echo it back unchanged. A size limit prevents resource amplification from oversized context payloads.The context field is opaque and may contain attacker-controlled data – do not interpret or display its contents.
def is_terminal_status(status: str) ‑> bool-
Expand source code
def is_terminal_status(status: str) -> bool: """Check if a media buy status is terminal (no further actions).""" return status in ("completed", "rejected", "canceled")Check if a media buy status is terminal (no further actions).
def list_creatives_response(creatives: list[Any],
*,
pagination: dict[str, Any] | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def list_creatives_response( creatives: list[Any], *, pagination: dict[str, Any] | None = None, sandbox: bool = True, ) -> dict[str, Any]: """Build a list_creatives response. Each creative should include: creative_id, name, format_id, status. Matches ListCreativesResponse schema. Timestamp defaults: every Creative item in the spec requires ``created_date`` and ``updated_date`` (ISO 8601 UTC). For any dict item that omits either field, this helper fills it with the current UTC timestamp (``datetime.now(timezone.utc).isoformat()``). Both fields default to the same value when neither is provided, which matches the intuitive meaning for a freshly-listed item. Explicit caller-provided values are always preserved. Pydantic model items are passed through ``_serialize`` unchanged — callers using typed Creative models should set timestamps on the model. """ now = datetime.now(timezone.utc).isoformat() filled: list[Any] = [] for item in creatives: if isinstance(item, dict): has_created = "created_date" in item and item["created_date"] is not None has_updated = "updated_date" in item and item["updated_date"] is not None if has_created and has_updated: filled.append(item) continue patched = dict(item) if not has_created: patched["created_date"] = now if not has_updated: patched["updated_date"] = now filled.append(patched) else: filled.append(item) count = len(filled) return { "creatives": _serialize(filled), "pagination": pagination or {"total": count, "has_more": False}, "query_summary": {"total_results": count, "total_matching": count, "returned": count}, "sandbox": sandbox, }Build a list_creatives response.
Each creative should include: creative_id, name, format_id, status. Matches ListCreativesResponse schema.
Timestamp defaults: every Creative item in the spec requires
created_dateandupdated_date(ISO 8601 UTC). For any dict item that omits either field, this helper fills it with the current UTC timestamp (datetime.now(timezone.utc).isoformat()). Both fields default to the same value when neither is provided, which matches the intuitive meaning for a freshly-listed item. Explicit caller-provided values are always preserved. Pydantic model items are passed through_serializeunchanged — callers using typed Creative models should set timestamps on the model. def log_event_response(events_received: int, events_processed: int, *, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def log_event_response( events_received: int, events_processed: int, *, sandbox: bool = True, ) -> dict[str, Any]: """Build a log_event success response. Matches LogEventResponse1 (success) schema. """ return { "events_received": events_received, "events_processed": events_processed, "sandbox": sandbox, }Build a log_event success response.
Matches LogEventResponse1 (success) schema.
def media_buy_error_response(errors: list[dict[str, str]]) ‑> dict[str, typing.Any]-
Expand source code
def media_buy_error_response(errors: list[dict[str, str]]) -> dict[str, Any]: """Build a create_media_buy error response. Each error dict: {"code": "...", "message": "..."}. Matches CreateMediaBuyResponse2 (error) schema. """ return {"errors": errors}Build a create_media_buy error response.
Each error dict: {"code": "…", "message": "…"}. Matches CreateMediaBuyResponse2 (error) schema.
def media_buy_response(media_buy_id: str,
packages: list[Any],
*,
buyer_ref: str | None = None,
status: str | None = None,
valid_actions: list[str] | None = None,
revision: int | None = None,
confirmed_at: str | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def media_buy_response( media_buy_id: str, packages: list[Any], *, buyer_ref: str | None = None, status: str | None = None, valid_actions: list[str] | None = None, revision: int | None = None, confirmed_at: str | None = None, sandbox: bool = True, ) -> dict[str, Any]: """Build a create_media_buy success response. Each package should include: package_id, product_id, pricing_option_id, budget. Matches CreateMediaBuyResponse1 (success) schema. Auto-populates valid_actions from status if not provided. Auto-sets revision to 1 and confirmed_at to now if not provided. """ resp: dict[str, Any] = { "media_buy_id": media_buy_id, "packages": _serialize(packages), "revision": revision if revision is not None else 1, "confirmed_at": confirmed_at or datetime.now(timezone.utc).isoformat(), "sandbox": sandbox, } if buyer_ref is not None: resp["buyer_ref"] = buyer_ref if status is not None: resp["status"] = status if valid_actions is None: resp["valid_actions"] = valid_actions_for_status(status) else: resp["valid_actions"] = valid_actions elif valid_actions is not None: resp["valid_actions"] = valid_actions return respBuild a create_media_buy success response.
Each package should include: package_id, product_id, pricing_option_id, budget. Matches CreateMediaBuyResponse1 (success) schema.
Auto-populates valid_actions from status if not provided. Auto-sets revision to 1 and confirmed_at to now if not provided.
def media_buys_response(media_buys: list[Any], *, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def media_buys_response( media_buys: list[Any], *, sandbox: bool = True, ) -> dict[str, Any]: """Build a get_media_buys response. Each media buy should include: media_buy_id, status, currency, packages. Matches GetMediaBuysResponse schema. """ return { "media_buys": _serialize(media_buys), "sandbox": sandbox, }Build a get_media_buys response.
Each media buy should include: media_buy_id, status, currency, packages. Matches GetMediaBuysResponse schema.
def not_supported(reason: str = 'This operation is not supported by this agent') ‑> NotImplementedResponse-
Expand source code
def not_supported( reason: str = "This operation is not supported by this agent", ) -> NotImplementedResponse: """Create a standard 'not supported' response. Use this to return from operations that your agent does not implement. Args: reason: Human-readable explanation of why the operation is not supported Returns: NotImplementedResponse with supported=False """ return NotImplementedResponse( supported=False, reason=reason, error=Error( code="NOT_SUPPORTED", message=reason, ), )Create a standard 'not supported' response.
Use this to return from operations that your agent does not implement.
Args
reason- Human-readable explanation of why the operation is not supported
Returns
NotImplementedResponse with supported=False
def preview_creative_response(previews: list[dict[str, Any]],
*,
expires_at: str | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def preview_creative_response( previews: list[dict[str, Any]], *, expires_at: str | None = None, sandbox: bool = True, ) -> dict[str, Any]: """Build a preview_creative single response. Each preview should include: preview_id, input ({format_id, name, assets}), renders ([{render_id, output_format, preview_url, role, dimensions}]). Matches PreviewCreativeResponse1 (single) schema. """ return { "response_type": "single", "previews": previews, "expires_at": expires_at or "2099-12-31T23:59:59Z", "sandbox": sandbox, }Build a preview_creative single response.
Each preview should include: preview_id, input ({format_id, name, assets}), renders ([{render_id, output_format, preview_url, role, dimensions}]).
Matches PreviewCreativeResponse1 (single) schema.
def products_response(products: list[Any], *, item_count: int | None = None, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def products_response( products: list[Any], *, item_count: int | None = None, sandbox: bool = True, ) -> dict[str, Any]: """Build a get_products response. Matches GetProductsResponse schema. """ serialized = _serialize(products) resp: dict[str, Any] = { "products": serialized, "item_count": item_count if item_count is not None else len(serialized), "sandbox": sandbox, } return respBuild a get_products response.
Matches GetProductsResponse schema.
def register_test_controller(mcp: Any,
store: TestControllerStore) ‑> None-
Expand source code
def register_test_controller(mcp: Any, store: TestControllerStore) -> None: """Register the comply_test_controller tool on an MCP server. This is the Python equivalent of the JS SDK's registerTestController(). It adds the comply_test_controller MCP tool backed by your TestControllerStore. Args: mcp: A FastMCP server instance. store: Your TestControllerStore implementation. Example: from adcp.server.test_controller import TestControllerStore, register_test_controller class MyStore(TestControllerStore): async def force_account_status(self, account_id, status): old = self.accounts[account_id]["status"] self.accounts[account_id]["status"] = status return {"previous_state": old, "current_state": status} mcp = create_mcp_server(MySeller(), name="my-agent") register_test_controller(mcp, MyStore()) mcp.run(transport="streamable-http") """ from mcp.server.fastmcp.tools import Tool from mcp.server.fastmcp.utilities.func_metadata import ArgModelBase, FuncMetadata from pydantic import ConfigDict async def comply_test_controller(**kwargs: Any) -> str: result = await _handle_test_controller(store, kwargs) return json.dumps(result) tool = Tool.from_function( comply_test_controller, name="comply_test_controller", description="Compliance test controller. Sandbox only, not for production use.", ) # Override schema with the proper comply_test_controller inputSchema tool.parameters = { "type": "object", "properties": { "scenario": { "type": "string", "enum": [ "list_scenarios", "force_creative_status", "force_account_status", "force_media_buy_status", "force_session_status", "simulate_delivery", "simulate_budget_spend", ], }, "params": {"type": "object"}, "context": {"type": "object"}, }, "required": ["scenario"], } # Override fn_metadata with a permissive model class _ControllerArgs(ArgModelBase): model_config = ConfigDict(extra="allow") def model_dump_one_level(self) -> dict[str, Any]: result: dict[str, Any] = {} for field_name in self.__class__.model_fields: result[field_name] = getattr(self, field_name) if self.model_extra: result.update(self.model_extra) return result tool.fn_metadata = FuncMetadata( arg_model=_ControllerArgs, output_schema=tool.fn_metadata.output_schema, output_model=tool.fn_metadata.output_model, wrap_output=tool.fn_metadata.wrap_output, ) mcp._tool_manager._tools["comply_test_controller"] = toolRegister the comply_test_controller tool on an MCP server.
This is the Python equivalent of the JS SDK's registerTestController(). It adds the comply_test_controller MCP tool backed by your TestControllerStore.
Args
mcp- A FastMCP server instance.
store- Your TestControllerStore implementation.
Example
from adcp.server.test_controller import TestControllerStore, register_test_controller
class MyStore(TestControllerStore): async def force_account_status(self, account_id, status): old = self.accounts[account_id]["status"] self.accounts[account_id]["status"] = status return {"previous_state": old, "current_state": status}
mcp = create_mcp_server(MySeller(), name="my-agent") register_test_controller(mcp, MyStore()) mcp.run(transport="streamable-http")
async def resolve_account(params: dict[str, Any], resolver: AccountResolver | None) ‑> tuple[typing.Any | None, dict[str, typing.Any] | None]-
Expand source code
async def resolve_account( params: dict[str, Any], resolver: AccountResolver | None, ) -> tuple[Any | None, dict[str, Any] | None]: """Resolve an account reference from request params. Returns (account, None) on success, (None, error_dict) on failure, or (None, None) if no account field or no resolver configured. The resolver can return None (auto-ACCOUNT_NOT_FOUND) or raise ``AccountError`` for specific error codes (ACCOUNT_SUSPENDED, ACCOUNT_PAYMENT_REQUIRED, ACCOUNT_AMBIGUOUS, etc.). """ if resolver is None or "account" not in params: return None, None try: account = await resolver(params["account"]) except AccountError as e: return None, adcp_error( e.code, e.error_message, field="account", suggestion=e.suggestion, ) if account is None: return None, adcp_error( "ACCOUNT_NOT_FOUND", "The specified account does not exist", field="account", suggestion="Use list_accounts to discover available accounts, " "or sync_accounts to create one", ) return account, NoneResolve an account reference from request params.
Returns (account, None) on success, (None, error_dict) on failure, or (None, None) if no account field or no resolver configured.
The resolver can return None (auto-ACCOUNT_NOT_FOUND) or raise
AccountErrorfor specific error codes (ACCOUNT_SUSPENDED, ACCOUNT_PAYMENT_REQUIRED, ACCOUNT_AMBIGUOUS, etc.). def serve(handler: ADCPHandler | Any,
*,
name: str = 'adcp-agent',
port: int | None = None,
transport: str = 'streamable-http',
instructions: str | None = None,
test_controller: TestControllerStore | None = None) ‑> None-
Expand source code
def serve( handler: ADCPHandler | Any, *, name: str = "adcp-agent", port: int | None = None, transport: str = "streamable-http", instructions: str | None = None, test_controller: TestControllerStore | None = None, ) -> None: """Start an MCP or A2A server from an ADCP handler or server builder. Accepts either an ``ADCPHandler`` instance or an ``ADCPServerBuilder`` (from ``adcp_server()``). Builders are auto-converted via ``build_handler()``. This is the simplest way to run an ADCP agent. Set ``transport="a2a"`` to serve over the A2A protocol instead of MCP. Args: handler: An ADCPHandler subclass instance with your tool implementations. name: Server name shown to clients / in the A2A agent card. port: Port to listen on. Defaults to PORT env var, then 3001. transport: ``"streamable-http"`` (default, MCP) or ``"a2a"``. instructions: Optional system instructions for the agent (MCP only). test_controller: Optional TestControllerStore instance for storyboard testing. Security: This function does NOT configure authentication. In production, use a reverse proxy or middleware that validates credentials before forwarding to the endpoint. Without authentication, MCP exposes tools/list and A2A exposes /.well-known/agent.json, both of which reveal the agent's full capability surface. Example (MCP): from adcp.server import ADCPHandler, serve from adcp.server.responses import capabilities_response class MyAgent(ADCPHandler): async def get_adcp_capabilities(self, params, context=None): return capabilities_response(["media_buy"]) serve(MyAgent(), name="my-agent") Example (A2A): serve(MyAgent(), name="my-agent", transport="a2a") With test controller: from adcp.server.test_controller import TestControllerStore class MyStore(TestControllerStore): async def force_account_status(self, account_id, status): ... serve(MyAgent(), name="my-agent", test_controller=MyStore()) """ # Accept ADCPServerBuilder from adcp_server() decorator pattern from adcp.server.builder import ADCPServerBuilder if isinstance(handler, ADCPServerBuilder): if not name or name == "adcp-agent": name = handler.name handler = handler.build_handler() if transport == "a2a": _serve_a2a(handler, name=name, port=port, test_controller=test_controller) elif transport in ("streamable-http", "sse", "stdio"): _serve_mcp( handler, name=name, port=port, transport=transport, instructions=instructions, test_controller=test_controller, ) else: valid = ", ".join(sorted(("a2a", "streamable-http", "sse", "stdio"))) raise ValueError(f"Unknown transport {transport!r}. Valid: {valid}")Start an MCP or A2A server from an ADCP handler or server builder.
Accepts either an
ADCPHandlerinstance or anADCPServerBuilder(fromadcp_server()). Builders are auto-converted viabuild_handler().This is the simplest way to run an ADCP agent. Set
transport="a2a"to serve over the A2A protocol instead of MCP.Args
handler- An ADCPHandler subclass instance with your tool implementations.
name- Server name shown to clients / in the A2A agent card.
port- Port to listen on. Defaults to PORT env var, then 3001.
transport"streamable-http"(default, MCP) or"a2a".instructions- Optional system instructions for the agent (MCP only).
test_controller- Optional TestControllerStore instance for storyboard testing.
Security
This function does NOT configure authentication. In production, use a reverse proxy or middleware that validates credentials before forwarding to the endpoint. Without authentication, MCP exposes tools/list and A2A exposes /.well-known/agent.json, both of which reveal the agent's full capability surface.
Example (MCP): from adcp.server import ADCPHandler, serve from adcp.server.responses import capabilities_response
class MyAgent(ADCPHandler): async def get_adcp_capabilities(self, params, context=None): return capabilities_response(["media_buy"]) serve(MyAgent(), name="my-agent")Example (A2A): serve(MyAgent(), name="my-agent", transport="a2a")
With test controller: from adcp.server.test_controller import TestControllerStore
class MyStore(TestControllerStore): async def force_account_status(self, account_id, status): ... serve(MyAgent(), name="my-agent", test_controller=MyStore()) def signals_response(signals: list[Any], *, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def signals_response( signals: list[Any], *, sandbox: bool = True, ) -> dict[str, Any]: """Build a get_signals response. Each signal should include: signal_agent_segment_id, name, description, signal_type, data_provider, coverage_percentage, deployments, pricing_options, signal_id. Matches GetSignalsResponse schema. """ return { "signals": _serialize(signals), "sandbox": sandbox, }Build a get_signals response.
Each signal should include: signal_agent_segment_id, name, description, signal_type, data_provider, coverage_percentage, deployments, pricing_options, signal_id. Matches GetSignalsResponse schema.
def sync_accounts_response(accounts: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def sync_accounts_response( accounts: list[dict[str, Any]], *, sandbox: bool = True, ) -> dict[str, Any]: """Build a sync_accounts success response. Each account dict should include: account_id, brand, operator, action ("created"|"updated"), status ("active"|"pending_approval"). Matches SyncAccountsResponse1 schema (field: "accounts"). """ return {"accounts": accounts, "sandbox": sandbox}Build a sync_accounts success response.
Each account dict should include: account_id, brand, operator, action ("created"|"updated"), status ("active"|"pending_approval").
Matches SyncAccountsResponse1 schema (field: "accounts").
def sync_catalogs_response(catalogs: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def sync_catalogs_response( catalogs: list[dict[str, Any]], *, sandbox: bool = True, ) -> dict[str, Any]: """Build a sync_catalogs success response. Each catalog should include: catalog_id, action, item_count, items_approved. Matches SyncCatalogsResponse1 (success) schema. """ return { "catalogs": catalogs, "sandbox": sandbox, }Build a sync_catalogs success response.
Each catalog should include: catalog_id, action, item_count, items_approved. Matches SyncCatalogsResponse1 (success) schema.
def sync_creatives_response(creatives: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def sync_creatives_response( creatives: list[dict[str, Any]], *, sandbox: bool = True, ) -> dict[str, Any]: """Build a sync_creatives success response. Each creative dict should include: creative_id, action ("created"|"updated"). Optionally: status ("processing"|"pending_review"|"approved"|"rejected"|"archived"). Matches SyncCreativesResponse1 schema (field: "creatives"). """ return {"creatives": creatives, "sandbox": sandbox}Build a sync_creatives success response.
Each creative dict should include: creative_id, action ("created"|"updated"). Optionally: status ("processing"|"pending_review"|"approved"|"rejected"|"archived"). Matches SyncCreativesResponse1 schema (field: "creatives").
def sync_governance_response(accounts: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def sync_governance_response( accounts: list[dict[str, Any]], *, sandbox: bool = True, ) -> dict[str, Any]: """Build a sync_governance response. Each account dict should include: account, status ("synced"), governance_agents ([{url, categories}]). """ return {"accounts": accounts, "sandbox": sandbox}Build a sync_governance response.
Each account dict should include: account, status ("synced"), governance_agents ([{url, categories}]).
def update_media_buy_response(media_buy_id: str,
*,
affected_packages: list[Any] | None = None,
status: str | None = None,
valid_actions: list[str] | None = None,
revision: int | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]-
Expand source code
def update_media_buy_response( media_buy_id: str, *, affected_packages: list[Any] | None = None, status: str | None = None, valid_actions: list[str] | None = None, revision: int | None = None, sandbox: bool = True, ) -> dict[str, Any]: """Build an update_media_buy success response. Matches UpdateMediaBuyResponse1 (success) schema. Auto-populates valid_actions from status if not provided. """ resp: dict[str, Any] = { "media_buy_id": media_buy_id, "sandbox": sandbox, } if affected_packages is not None: resp["affected_packages"] = _serialize(affected_packages) if status is not None: resp["status"] = status if valid_actions is None: resp["valid_actions"] = valid_actions_for_status(status) else: resp["valid_actions"] = valid_actions elif valid_actions is not None: resp["valid_actions"] = valid_actions if revision is not None: resp["revision"] = revision return respBuild an update_media_buy success response.
Matches UpdateMediaBuyResponse1 (success) schema. Auto-populates valid_actions from status if not provided.
def valid_actions_for_status(status: str) ‑> list[str]-
Expand source code
def valid_actions_for_status(status: str) -> list[str]: """Get valid buyer actions for a media buy status.""" return list(MEDIA_BUY_STATE_MACHINE.get(status, []))Get valid buyer actions for a media buy status.
def validate_capabilities(handler: Any, capabilities: GetAdcpCapabilitiesResponse) ‑> list[str]-
Expand source code
def validate_capabilities( handler: Any, capabilities: GetAdcpCapabilitiesResponse, ) -> list[str]: """Check that a handler implements the methods required by its declared features. Compares the features declared in a capabilities response against the handler's method implementations. Returns warnings for features that are declared but whose corresponding handler methods are not overridden from the base class. This is a development-time check — call it at startup to catch misconfigurations. Args: handler: An ADCPHandler instance (or any object with handler methods). capabilities: The capabilities response the handler will serve. Returns: List of warning strings. Empty if everything is consistent. """ # Late import to avoid circular dependency: server.base imports from adcp.types # which may transitively import from this module. from adcp.server.base import ADCPHandler resolver = FeatureResolver(capabilities) warnings: list[str] = [] for feature, handler_methods in FEATURE_HANDLER_MAP.items(): if not resolver.supports(feature): continue for method_name in handler_methods: if not hasattr(handler, method_name): warnings.append( f"Feature '{feature}' is declared but handler has no " f"'{method_name}' method" ) continue # Walk MRO to check if any class between the leaf and ADCPHandler # overrides the method (handles mixin / intermediate-class patterns). if isinstance(handler, ADCPHandler): overridden = any( method_name in cls.__dict__ for cls in type(handler).__mro__ if cls is not ADCPHandler and not issubclass(ADCPHandler, cls) ) if not overridden: warnings.append( f"Feature '{feature}' is declared but '{method_name}' " f"is not overridden from ADCPHandler" ) return warningsCheck that a handler implements the methods required by its declared features.
Compares the features declared in a capabilities response against the handler's method implementations. Returns warnings for features that are declared but whose corresponding handler methods are not overridden from the base class.
This is a development-time check — call it at startup to catch misconfigurations.
Args
handler- An ADCPHandler instance (or any object with handler methods).
capabilities- The capabilities response the handler will serve.
Returns
List of warning strings. Empty if everything is consistent.
Classes
class ADCPAgentExecutor (handler: ADCPHandler,
test_controller: TestControllerStore | None = None)-
Expand source code
class ADCPAgentExecutor(AgentExecutor): """Bridges ADCPHandler methods to the a2a-sdk AgentExecutor interface. Incoming A2A messages are parsed to extract the ADCP skill name and parameters, dispatched to the matching handler method, and the result is published back as A2A Task events. Expects the explicit skill invocation format used by A2AAdapter: DataPart(data={"skill": "get_products", "parameters": {...}}) """ def __init__( self, handler: ADCPHandler, test_controller: TestControllerStore | None = None, ) -> None: self._handler = handler self._tool_callers: dict[str, Any] = {} # Build tool callers for all tools this handler supports. # Skip comply_test_controller unless the seller passed a # TestControllerStore; otherwise we would advertise a skill # backed only by the handler's not-supported stub. tool_defs = get_tools_for_handler(handler) for tool_def in tool_defs: name = tool_def["name"] if name == "comply_test_controller" and test_controller is None: continue self._tool_callers[name] = create_tool_caller(handler, name) if test_controller is not None: self._register_test_controller(test_controller) @property def supported_skills(self) -> list[str]: """List of skill names this executor can handle.""" return list(self._tool_callers.keys()) def _register_test_controller(self, store: TestControllerStore) -> None: """Register comply_test_controller as a callable skill.""" async def _call_test_controller( params: dict[str, Any], context: ToolContext | None = None ) -> Any: return await _handle_test_controller(store, params) self._tool_callers["comply_test_controller"] = _call_test_controller async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: """Execute an ADCP skill from an incoming A2A message.""" skill_name, params = self._parse_request(context) if skill_name is None: await self._send_error(event_queue, context, "No skill specified in message") return if skill_name not in self._tool_callers: await self._send_error(event_queue, context, f"Unknown skill: {skill_name}") return tool_context = _tool_context_from_request(context) try: result = await self._tool_callers[skill_name](params, tool_context) await self._send_result(event_queue, context, skill_name, result) except ADCPError as exc: # Application-layer AdCP error (IdempotencyConflictError etc.). # Emit a failed task with the adcp_error in a DataPart per # transport-errors.mdx §A2A Binding, plus a human-readable text # part. The JSON-RPC channel is reserved for transport-level # errors (auth rejected, rate-limited pre-dispatch). logger.info("AdCP application error for skill %s: %s", skill_name, exc) await self._send_adcp_error(event_queue, context, exc) except Exception: logger.exception("Error executing skill %s", skill_name) await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}") async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: """ADCP operations are synchronous; cancellation sets state to canceled.""" event = _make_task( context, state=TaskState.canceled, message="Task canceled", ) await event_queue.enqueue_event(event) # ------------------------------------------------------------------ # Message parsing # ------------------------------------------------------------------ def _parse_request(self, context: RequestContext) -> tuple[str | None, dict[str, Any]]: """Extract skill name and parameters from the A2A message. Supports two formats: 1. Explicit skill invocation via DataPart: DataPart(data={"skill": "get_products", "parameters": {...}}) 2. Natural language fallback via TextPart (best-effort parse) """ msg = context.message if msg is None or not msg.parts: return None, {} # Try DataPart first (explicit skill invocation) for part in msg.parts: inner = part.root if hasattr(part, "root") else part if isinstance(inner, DataPart) and isinstance(inner.data, dict): skill = inner.data.get("skill") params = inner.data.get("parameters", {}) if skill: return str(skill), params if isinstance(params, dict) else {} # Fallback: try to parse TextPart as JSON for part in msg.parts: inner = part.root if hasattr(part, "root") else part if isinstance(inner, TextPart): parsed = self._parse_text_request(inner.text) if parsed[0] is not None: return parsed return None, {} def _parse_text_request(self, text: str) -> tuple[str | None, dict[str, Any]]: """Best-effort parse of a text request for skill + params.""" try: data = json.loads(text) if isinstance(data, dict) and "skill" in data: return str(data["skill"]), data.get("parameters", {}) except (json.JSONDecodeError, TypeError): pass return None, {} # ------------------------------------------------------------------ # Response helpers # ------------------------------------------------------------------ async def _send_result( self, event_queue: EventQueue, context: RequestContext, skill_name: str, result: Any, ) -> None: """Publish a completed task with the skill result.""" # Normalize result to a JSON-safe dict if hasattr(result, "model_dump"): data = result.model_dump(mode="json", exclude_none=True) elif not isinstance(result, dict): data = {"result": result} else: data = result task = _make_task( context, state=TaskState.completed, data=data, message=f"Completed {skill_name}", ) await event_queue.enqueue_event(task) async def _send_error( self, event_queue: EventQueue, context: RequestContext, error_msg: str, ) -> None: """Publish a failed task.""" task = _make_task( context, state=TaskState.failed, message=error_msg, ) await event_queue.enqueue_event(task) async def _send_adcp_error( self, event_queue: EventQueue, context: RequestContext, exc: ADCPError, ) -> None: """Publish a failed task carrying an AdCP ``adcp_error`` payload. Follows transport-errors.mdx §A2A Binding: failed task with artifact containing a ``DataPart`` keyed under ``adcp_error`` plus a terse ``TextPart`` for human/LLM consumption. """ # Derive the spec error code. ADCPTaskError carries a list of codes # (e.g. IdempotencyConflictError → IDEMPOTENCY_CONFLICT); fall back # to a generic INTERNAL_ERROR when the exception doesn't supply one. code = "INTERNAL_ERROR" if isinstance(exc, ADCPTaskError) and exc.error_codes: code = str(exc.error_codes[0]) adcp_error: dict[str, Any] = { "code": code, "message": exc.message, } recovery = STANDARD_ERROR_CODES.get(code, {}).get("recovery") if recovery: adcp_error["recovery"] = recovery suggestion = getattr(exc, "suggestion", None) if suggestion: adcp_error["suggestion"] = suggestion task = _make_task( context, state=TaskState.failed, data={"adcp_error": adcp_error}, message=exc.message, ) await event_queue.enqueue_event(task)Bridges ADCPHandler methods to the a2a-sdk AgentExecutor interface.
Incoming A2A messages are parsed to extract the ADCP skill name and parameters, dispatched to the matching handler method, and the result is published back as A2A Task events.
Expects the explicit skill invocation format used by A2AAdapter: DataPart(data={"skill": "get_products", "parameters": {…}})
Ancestors
- a2a.server.agent_execution.agent_executor.AgentExecutor
- abc.ABC
Instance variables
prop supported_skills : list[str]-
Expand source code
@property def supported_skills(self) -> list[str]: """List of skill names this executor can handle.""" return list(self._tool_callers.keys())List of skill names this executor can handle.
Methods
async def cancel(self, context: RequestContext, event_queue: EventQueue) ‑> None-
Expand source code
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: """ADCP operations are synchronous; cancellation sets state to canceled.""" event = _make_task( context, state=TaskState.canceled, message="Task canceled", ) await event_queue.enqueue_event(event)ADCP operations are synchronous; cancellation sets state to canceled.
async def execute(self, context: RequestContext, event_queue: EventQueue) ‑> None-
Expand source code
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: """Execute an ADCP skill from an incoming A2A message.""" skill_name, params = self._parse_request(context) if skill_name is None: await self._send_error(event_queue, context, "No skill specified in message") return if skill_name not in self._tool_callers: await self._send_error(event_queue, context, f"Unknown skill: {skill_name}") return tool_context = _tool_context_from_request(context) try: result = await self._tool_callers[skill_name](params, tool_context) await self._send_result(event_queue, context, skill_name, result) except ADCPError as exc: # Application-layer AdCP error (IdempotencyConflictError etc.). # Emit a failed task with the adcp_error in a DataPart per # transport-errors.mdx §A2A Binding, plus a human-readable text # part. The JSON-RPC channel is reserved for transport-level # errors (auth rejected, rate-limited pre-dispatch). logger.info("AdCP application error for skill %s: %s", skill_name, exc) await self._send_adcp_error(event_queue, context, exc) except Exception: logger.exception("Error executing skill %s", skill_name) await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")Execute an ADCP skill from an incoming A2A message.
class ADCPHandler-
Expand source code
class ADCPHandler(ABC): """Base class for ADCP operation handlers. Subclass this to implement ADCP operations. All operations have default implementations that return 'not supported', allowing you to implement only the operations your agent supports. For protocol-specific handlers, use: - ContentStandardsHandler: For content standards agents - SponsoredIntelligenceHandler: For sponsored intelligence agents - GovernanceHandler: For governance agents """ _agent_type: str = "this agent" def _not_supported(self, operation: str) -> NotImplementedResponse: """Create a not-supported response that includes the agent type.""" return not_supported(f"{operation} is not supported by {self._agent_type}") # ======================================================================== # Core Catalog Operations # ======================================================================== async def get_products( self, params: GetProductsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get advertising products. Override this to provide product catalog functionality. """ return self._not_supported("get_products") async def list_creative_formats( self, params: ListCreativeFormatsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """List supported creative formats. Override this to provide creative format information. """ return self._not_supported("list_creative_formats") # ======================================================================== # Creative Operations # ======================================================================== async def sync_creatives( self, params: SyncCreativesRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync creatives. Override this to handle creative synchronization. """ return self._not_supported("sync_creatives") async def list_creatives( self, params: ListCreativesRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """List creatives. Override this to list synced creatives. """ return self._not_supported("list_creatives") async def build_creative( self, params: BuildCreativeRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Build a creative. Override this to build creatives from assets. """ return self._not_supported("build_creative") async def preview_creative( self, params: PreviewCreativeRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Preview a creative rendering. Override this to provide creative preview functionality. """ return self._not_supported("preview_creative") async def get_creative_delivery( self, params: GetCreativeDeliveryRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get creative delivery metrics. Override this to provide functionality. """ return self._not_supported("get_creative_delivery") # ======================================================================== # Media Buy Operations # ======================================================================== async def create_media_buy( self, params: CreateMediaBuyRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Create a media buy. Override this to handle media buy creation. """ return self._not_supported("create_media_buy") async def update_media_buy( self, params: UpdateMediaBuyRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Update a media buy. Override this to handle media buy updates. """ return self._not_supported("update_media_buy") async def get_media_buy_delivery( self, params: GetMediaBuyDeliveryRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get media buy delivery metrics. Override this to provide delivery reporting. """ return self._not_supported("get_media_buy_delivery") async def get_media_buys( self, params: GetMediaBuysRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get media buys with status and optional delivery snapshots. Override this to provide media buy listing functionality. """ return self._not_supported("get_media_buys") # ======================================================================== # Signal Operations # ======================================================================== async def get_signals( self, params: GetSignalsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get available signals. Override this to provide signal catalog. """ return self._not_supported("get_signals") async def activate_signal( self, params: ActivateSignalRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Activate a signal. Override this to handle signal activation. """ return self._not_supported("activate_signal") # ======================================================================== # Feedback Operations # ======================================================================== async def provide_performance_feedback( self, params: ProvidePerformanceFeedbackRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Provide performance feedback. Override this to handle performance feedback ingestion. """ return self._not_supported("provide_performance_feedback") # ======================================================================== # Account Operations # ======================================================================== async def list_accounts( self, params: ListAccountsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """List accounts. Override this to provide functionality. """ return self._not_supported("list_accounts") async def sync_accounts( self, params: SyncAccountsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync accounts. Override this to provide functionality. """ return self._not_supported("sync_accounts") async def get_account_financials( self, params: GetAccountFinancialsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get account financials. Override this to provide account financial reporting. """ return self._not_supported("get_account_financials") async def report_usage( self, params: ReportUsageRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Report account usage. Override this to ingest account usage. """ return self._not_supported("report_usage") # ======================================================================== # Event Operations # ======================================================================== async def log_event( self, params: LogEventRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Log event. Override this to provide functionality. """ return self._not_supported("log_event") async def sync_event_sources( self, params: SyncEventSourcesRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync event sources. Override this to provide functionality. """ return self._not_supported("sync_event_sources") async def sync_audiences( self, params: SyncAudiencesRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync audiences. Override this to provide audience synchronization. """ return self._not_supported("sync_audiences") async def sync_governance( self, params: SyncGovernanceRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync governance agents for accounts. Override this to handle governance agent registration. """ return self._not_supported("sync_governance") async def sync_catalogs( self, params: SyncCatalogsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync catalogs. Override this to provide catalog synchronization. """ return self._not_supported("sync_catalogs") # ======================================================================== # V3 Protocol Discovery # ======================================================================== async def get_adcp_capabilities( self, params: GetAdcpCapabilitiesRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get ADCP capabilities. Override this to advertise your agent's capabilities. """ return self._not_supported("get_adcp_capabilities") # ======================================================================== # V3 Content Standards Operations # ======================================================================== async def create_content_standards( self, params: CreateContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Create content standards configuration. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("create_content_standards") async def get_content_standards( self, params: GetContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get content standards configuration. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("get_content_standards") async def list_content_standards( self, params: ListContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """List content standards configurations. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("list_content_standards") async def update_content_standards( self, params: UpdateContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Update content standards configuration. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("update_content_standards") async def calibrate_content( self, params: CalibrateContentRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Calibrate content against standards. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("calibrate_content") async def validate_content_delivery( self, params: ValidateContentDeliveryRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Validate content delivery against standards. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("validate_content_delivery") async def get_media_buy_artifacts( self, params: GetMediaBuyArtifactsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get artifacts associated with a media buy. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("get_media_buy_artifacts") # ======================================================================== # V3 Sponsored Intelligence Operations # ======================================================================== async def si_get_offering( self, params: SiGetOfferingRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get sponsored intelligence offering. Override this in SponsoredIntelligenceHandler subclasses. """ return self._not_supported("si_get_offering") async def si_initiate_session( self, params: SiInitiateSessionRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Initiate sponsored intelligence session. Override this in SponsoredIntelligenceHandler subclasses. """ return self._not_supported("si_initiate_session") async def si_send_message( self, params: SiSendMessageRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Send message in sponsored intelligence session. Override this in SponsoredIntelligenceHandler subclasses. """ return self._not_supported("si_send_message") async def si_terminate_session( self, params: SiTerminateSessionRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Terminate sponsored intelligence session. Override this in SponsoredIntelligenceHandler subclasses. """ return self._not_supported("si_terminate_session") # ======================================================================== # V3 Governance Operations # ======================================================================== async def get_creative_features( self, params: GetCreativeFeaturesRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Evaluate governance features for a creative. Override this in GovernanceHandler subclasses. """ return self._not_supported("get_creative_features") async def sync_plans( self, params: SyncPlansRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync campaign governance plans. Override this in GovernanceHandler subclasses. """ return self._not_supported("sync_plans") async def check_governance( self, params: CheckGovernanceRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Check an action against campaign governance. Override this in GovernanceHandler subclasses. """ return self._not_supported("check_governance") async def report_plan_outcome( self, params: ReportPlanOutcomeRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Report the outcome of a governed action. Override this in GovernanceHandler subclasses. """ return self._not_supported("report_plan_outcome") async def get_plan_audit_logs( self, params: GetPlanAuditLogsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Retrieve governance audit logs for plans. Override this in GovernanceHandler subclasses. """ return self._not_supported("get_plan_audit_logs") async def create_property_list( self, params: CreatePropertyListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Create a property list for governance filtering. Override this in GovernanceHandler subclasses. """ return self._not_supported("create_property_list") async def get_property_list( self, params: GetPropertyListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get a property list with optional resolution. Override this in GovernanceHandler subclasses. """ return self._not_supported("get_property_list") async def list_property_lists( self, params: ListPropertyListsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """List property lists. Override this in GovernanceHandler subclasses. """ return self._not_supported("list_property_lists") async def update_property_list( self, params: UpdatePropertyListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Update a property list. Override this in GovernanceHandler subclasses. """ return self._not_supported("update_property_list") async def delete_property_list( self, params: DeletePropertyListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Delete a property list. Override this in GovernanceHandler subclasses. """ return self._not_supported("delete_property_list") # ======================================================================== # V3 Governance (Collection Lists) Operations # ======================================================================== async def create_collection_list( self, params: CreateCollectionListRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Create a collection list for governance filtering. Override this in GovernanceHandler subclasses. """ return self._not_supported("create_collection_list") async def get_collection_list( self, params: GetCollectionListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get a collection list with optional resolution. Override this in GovernanceHandler subclasses. """ return self._not_supported("get_collection_list") async def list_collection_lists( self, params: ListCollectionListsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """List collection lists. Override this in GovernanceHandler subclasses. """ return self._not_supported("list_collection_lists") async def update_collection_list( self, params: UpdateCollectionListRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Update a collection list. Override this in GovernanceHandler subclasses. """ return self._not_supported("update_collection_list") async def delete_collection_list( self, params: DeleteCollectionListRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Delete a collection list. Override this in GovernanceHandler subclasses. """ return self._not_supported("delete_collection_list") # ======================================================================== # V3 TMP Operations # ======================================================================== async def context_match( self, params: ContextMatchRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Match ad context to buyer packages. Override this to provide TMP context matching. """ return self._not_supported("context_match") async def identity_match( self, params: IdentityMatchRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Match user identity for package eligibility. Override this to provide TMP identity matching. """ return self._not_supported("identity_match") # ======================================================================== # V3 Brand Rights Operations # ======================================================================== async def get_brand_identity( self, params: GetBrandIdentityRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get brand identity information. Override this in BrandHandler subclasses. """ return self._not_supported("get_brand_identity") async def get_rights( self, params: GetRightsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get available rights for licensing. Override this in BrandHandler subclasses. """ return self._not_supported("get_rights") async def acquire_rights( self, params: AcquireRightsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Acquire rights for brand content usage. Override this in BrandHandler subclasses. """ return self._not_supported("acquire_rights") async def update_rights( self, params: UpdateRightsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Update terms of an existing rights acquisition. Override this in BrandHandler subclasses. Partial update: the request carries ``rights_id`` plus any subset of the mutable fields (``end_date``, ``impression_cap``, ``pricing_option_id``, ``paused``). Seller responsibilities you own when implementing this: * Reject updates on expired or revoked acquisitions with an appropriate error code — do not partial-commit. * Reject ``pricing_option_id`` swaps to incompatible options — the new option's terms must be a strict superset of the original. * Apply all accepted fields atomically — callers should never observe a half-applied update on failure. """ return self._not_supported("update_rights") # ======================================================================== # V3 Compliance Operations # ======================================================================== async def comply_test_controller( self, params: ComplyTestControllerRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Compliance test controller (sandbox only). Override this in ComplianceHandler subclasses. """ return self._not_supported("comply_test_controller")Base class for ADCP operation handlers.
Subclass this to implement ADCP operations. All operations have default implementations that return 'not supported', allowing you to implement only the operations your agent supports.
For protocol-specific handlers, use: - ContentStandardsHandler: For content standards agents - SponsoredIntelligenceHandler: For sponsored intelligence agents - GovernanceHandler: For governance agents
Ancestors
- abc.ABC
Subclasses
- BrandHandler
- ComplianceHandler
- ContentStandardsHandler
- GovernanceHandler
- SponsoredIntelligenceHandler
- TmpHandler
Methods
async def acquire_rights(self,
params: AcquireRightsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def acquire_rights( self, params: AcquireRightsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Acquire rights for brand content usage. Override this in BrandHandler subclasses. """ return self._not_supported("acquire_rights")Acquire rights for brand content usage.
Override this in BrandHandler subclasses.
async def activate_signal(self,
params: ActivateSignalRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def activate_signal( self, params: ActivateSignalRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Activate a signal. Override this to handle signal activation. """ return self._not_supported("activate_signal")Activate a signal.
Override this to handle signal activation.
async def build_creative(self,
params: BuildCreativeRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def build_creative( self, params: BuildCreativeRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Build a creative. Override this to build creatives from assets. """ return self._not_supported("build_creative")Build a creative.
Override this to build creatives from assets.
async def calibrate_content(self,
params: CalibrateContentRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def calibrate_content( self, params: CalibrateContentRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Calibrate content against standards. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("calibrate_content")Calibrate content against standards.
Override this in ContentStandardsHandler subclasses.
async def check_governance(self,
params: CheckGovernanceRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def check_governance( self, params: CheckGovernanceRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Check an action against campaign governance. Override this in GovernanceHandler subclasses. """ return self._not_supported("check_governance")Check an action against campaign governance.
Override this in GovernanceHandler subclasses.
async def comply_test_controller(self,
params: ComplyTestControllerRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def comply_test_controller( self, params: ComplyTestControllerRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Compliance test controller (sandbox only). Override this in ComplianceHandler subclasses. """ return self._not_supported("comply_test_controller")Compliance test controller (sandbox only).
Override this in ComplianceHandler subclasses.
async def context_match(self,
params: ContextMatchRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def context_match( self, params: ContextMatchRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Match ad context to buyer packages. Override this to provide TMP context matching. """ return self._not_supported("context_match")Match ad context to buyer packages.
Override this to provide TMP context matching.
async def create_collection_list(self,
params: CreateCollectionListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def create_collection_list( self, params: CreateCollectionListRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Create a collection list for governance filtering. Override this in GovernanceHandler subclasses. """ return self._not_supported("create_collection_list")Create a collection list for governance filtering.
Override this in GovernanceHandler subclasses.
async def create_content_standards(self,
params: CreateContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def create_content_standards( self, params: CreateContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Create content standards configuration. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("create_content_standards")Create content standards configuration.
Override this in ContentStandardsHandler subclasses.
async def create_media_buy(self,
params: CreateMediaBuyRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def create_media_buy( self, params: CreateMediaBuyRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Create a media buy. Override this to handle media buy creation. """ return self._not_supported("create_media_buy")Create a media buy.
Override this to handle media buy creation.
async def create_property_list(self,
params: CreatePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def create_property_list( self, params: CreatePropertyListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Create a property list for governance filtering. Override this in GovernanceHandler subclasses. """ return self._not_supported("create_property_list")Create a property list for governance filtering.
Override this in GovernanceHandler subclasses.
async def delete_collection_list(self,
params: DeleteCollectionListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def delete_collection_list( self, params: DeleteCollectionListRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Delete a collection list. Override this in GovernanceHandler subclasses. """ return self._not_supported("delete_collection_list")Delete a collection list.
Override this in GovernanceHandler subclasses.
async def delete_property_list(self,
params: DeletePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def delete_property_list( self, params: DeletePropertyListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Delete a property list. Override this in GovernanceHandler subclasses. """ return self._not_supported("delete_property_list")Delete a property list.
Override this in GovernanceHandler subclasses.
async def get_account_financials(self,
params: GetAccountFinancialsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_account_financials( self, params: GetAccountFinancialsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get account financials. Override this to provide account financial reporting. """ return self._not_supported("get_account_financials")Get account financials.
Override this to provide account financial reporting.
async def get_adcp_capabilities(self,
params: GetAdcpCapabilitiesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_adcp_capabilities( self, params: GetAdcpCapabilitiesRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get ADCP capabilities. Override this to advertise your agent's capabilities. """ return self._not_supported("get_adcp_capabilities")Get ADCP capabilities.
Override this to advertise your agent's capabilities.
async def get_brand_identity(self,
params: GetBrandIdentityRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_brand_identity( self, params: GetBrandIdentityRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get brand identity information. Override this in BrandHandler subclasses. """ return self._not_supported("get_brand_identity")Get brand identity information.
Override this in BrandHandler subclasses.
async def get_collection_list(self,
params: GetCollectionListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_collection_list( self, params: GetCollectionListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get a collection list with optional resolution. Override this in GovernanceHandler subclasses. """ return self._not_supported("get_collection_list")Get a collection list with optional resolution.
Override this in GovernanceHandler subclasses.
async def get_content_standards(self,
params: GetContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_content_standards( self, params: GetContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get content standards configuration. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("get_content_standards")Get content standards configuration.
Override this in ContentStandardsHandler subclasses.
async def get_creative_delivery(self,
params: GetCreativeDeliveryRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_creative_delivery( self, params: GetCreativeDeliveryRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get creative delivery metrics. Override this to provide functionality. """ return self._not_supported("get_creative_delivery")Get creative delivery metrics.
Override this to provide functionality.
async def get_creative_features(self,
params: GetCreativeFeaturesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_creative_features( self, params: GetCreativeFeaturesRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Evaluate governance features for a creative. Override this in GovernanceHandler subclasses. """ return self._not_supported("get_creative_features")Evaluate governance features for a creative.
Override this in GovernanceHandler subclasses.
async def get_media_buy_artifacts(self,
params: GetMediaBuyArtifactsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_media_buy_artifacts( self, params: GetMediaBuyArtifactsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get artifacts associated with a media buy. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("get_media_buy_artifacts")Get artifacts associated with a media buy.
Override this in ContentStandardsHandler subclasses.
async def get_media_buy_delivery(self,
params: GetMediaBuyDeliveryRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_media_buy_delivery( self, params: GetMediaBuyDeliveryRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Get media buy delivery metrics. Override this to provide delivery reporting. """ return self._not_supported("get_media_buy_delivery")Get media buy delivery metrics.
Override this to provide delivery reporting.
async def get_media_buys(self,
params: GetMediaBuysRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_media_buys( self, params: GetMediaBuysRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get media buys with status and optional delivery snapshots. Override this to provide media buy listing functionality. """ return self._not_supported("get_media_buys")Get media buys with status and optional delivery snapshots.
Override this to provide media buy listing functionality.
async def get_plan_audit_logs(self,
params: GetPlanAuditLogsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_plan_audit_logs( self, params: GetPlanAuditLogsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Retrieve governance audit logs for plans. Override this in GovernanceHandler subclasses. """ return self._not_supported("get_plan_audit_logs")Retrieve governance audit logs for plans.
Override this in GovernanceHandler subclasses.
async def get_products(self,
params: GetProductsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_products( self, params: GetProductsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get advertising products. Override this to provide product catalog functionality. """ return self._not_supported("get_products")Get advertising products.
Override this to provide product catalog functionality.
async def get_property_list(self,
params: GetPropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_property_list( self, params: GetPropertyListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get a property list with optional resolution. Override this in GovernanceHandler subclasses. """ return self._not_supported("get_property_list")Get a property list with optional resolution.
Override this in GovernanceHandler subclasses.
async def get_rights(self,
params: GetRightsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_rights( self, params: GetRightsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get available rights for licensing. Override this in BrandHandler subclasses. """ return self._not_supported("get_rights")Get available rights for licensing.
Override this in BrandHandler subclasses.
async def get_signals(self,
params: GetSignalsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def get_signals( self, params: GetSignalsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get available signals. Override this to provide signal catalog. """ return self._not_supported("get_signals")Get available signals.
Override this to provide signal catalog.
async def identity_match(self,
params: IdentityMatchRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def identity_match( self, params: IdentityMatchRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Match user identity for package eligibility. Override this to provide TMP identity matching. """ return self._not_supported("identity_match")Match user identity for package eligibility.
Override this to provide TMP identity matching.
async def list_accounts(self,
params: ListAccountsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def list_accounts( self, params: ListAccountsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """List accounts. Override this to provide functionality. """ return self._not_supported("list_accounts")List accounts.
Override this to provide functionality.
async def list_collection_lists(self,
params: ListCollectionListsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def list_collection_lists( self, params: ListCollectionListsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """List collection lists. Override this in GovernanceHandler subclasses. """ return self._not_supported("list_collection_lists")List collection lists.
Override this in GovernanceHandler subclasses.
async def list_content_standards(self,
params: ListContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def list_content_standards( self, params: ListContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """List content standards configurations. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("list_content_standards")List content standards configurations.
Override this in ContentStandardsHandler subclasses.
async def list_creative_formats(self,
params: ListCreativeFormatsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def list_creative_formats( self, params: ListCreativeFormatsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """List supported creative formats. Override this to provide creative format information. """ return self._not_supported("list_creative_formats")List supported creative formats.
Override this to provide creative format information.
async def list_creatives(self,
params: ListCreativesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def list_creatives( self, params: ListCreativesRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """List creatives. Override this to list synced creatives. """ return self._not_supported("list_creatives")List creatives.
Override this to list synced creatives.
async def list_property_lists(self,
params: ListPropertyListsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def list_property_lists( self, params: ListPropertyListsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """List property lists. Override this in GovernanceHandler subclasses. """ return self._not_supported("list_property_lists")List property lists.
Override this in GovernanceHandler subclasses.
async def log_event(self,
params: LogEventRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def log_event( self, params: LogEventRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Log event. Override this to provide functionality. """ return self._not_supported("log_event")Log event.
Override this to provide functionality.
async def preview_creative(self,
params: PreviewCreativeRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def preview_creative( self, params: PreviewCreativeRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Preview a creative rendering. Override this to provide creative preview functionality. """ return self._not_supported("preview_creative")Preview a creative rendering.
Override this to provide creative preview functionality.
async def provide_performance_feedback(self,
params: ProvidePerformanceFeedbackRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def provide_performance_feedback( self, params: ProvidePerformanceFeedbackRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Provide performance feedback. Override this to handle performance feedback ingestion. """ return self._not_supported("provide_performance_feedback")Provide performance feedback.
Override this to handle performance feedback ingestion.
async def report_plan_outcome(self,
params: ReportPlanOutcomeRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def report_plan_outcome( self, params: ReportPlanOutcomeRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Report the outcome of a governed action. Override this in GovernanceHandler subclasses. """ return self._not_supported("report_plan_outcome")Report the outcome of a governed action.
Override this in GovernanceHandler subclasses.
async def report_usage(self,
params: ReportUsageRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def report_usage( self, params: ReportUsageRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Report account usage. Override this to ingest account usage. """ return self._not_supported("report_usage")Report account usage.
Override this to ingest account usage.
async def si_get_offering(self,
params: SiGetOfferingRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def si_get_offering( self, params: SiGetOfferingRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Get sponsored intelligence offering. Override this in SponsoredIntelligenceHandler subclasses. """ return self._not_supported("si_get_offering")Get sponsored intelligence offering.
Override this in SponsoredIntelligenceHandler subclasses.
async def si_initiate_session(self,
params: SiInitiateSessionRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def si_initiate_session( self, params: SiInitiateSessionRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Initiate sponsored intelligence session. Override this in SponsoredIntelligenceHandler subclasses. """ return self._not_supported("si_initiate_session")Initiate sponsored intelligence session.
Override this in SponsoredIntelligenceHandler subclasses.
async def si_send_message(self,
params: SiSendMessageRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def si_send_message( self, params: SiSendMessageRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Send message in sponsored intelligence session. Override this in SponsoredIntelligenceHandler subclasses. """ return self._not_supported("si_send_message")Send message in sponsored intelligence session.
Override this in SponsoredIntelligenceHandler subclasses.
async def si_terminate_session(self,
params: SiTerminateSessionRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def si_terminate_session( self, params: SiTerminateSessionRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Terminate sponsored intelligence session. Override this in SponsoredIntelligenceHandler subclasses. """ return self._not_supported("si_terminate_session")Terminate sponsored intelligence session.
Override this in SponsoredIntelligenceHandler subclasses.
async def sync_accounts(self,
params: SyncAccountsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def sync_accounts( self, params: SyncAccountsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync accounts. Override this to provide functionality. """ return self._not_supported("sync_accounts")Sync accounts.
Override this to provide functionality.
async def sync_audiences(self,
params: SyncAudiencesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def sync_audiences( self, params: SyncAudiencesRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync audiences. Override this to provide audience synchronization. """ return self._not_supported("sync_audiences")Sync audiences.
Override this to provide audience synchronization.
async def sync_catalogs(self,
params: SyncCatalogsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def sync_catalogs( self, params: SyncCatalogsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync catalogs. Override this to provide catalog synchronization. """ return self._not_supported("sync_catalogs")Sync catalogs.
Override this to provide catalog synchronization.
async def sync_creatives(self,
params: SyncCreativesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def sync_creatives( self, params: SyncCreativesRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync creatives. Override this to handle creative synchronization. """ return self._not_supported("sync_creatives")Sync creatives.
Override this to handle creative synchronization.
async def sync_event_sources(self,
params: SyncEventSourcesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def sync_event_sources( self, params: SyncEventSourcesRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync event sources. Override this to provide functionality. """ return self._not_supported("sync_event_sources")Sync event sources.
Override this to provide functionality.
async def sync_governance(self,
params: SyncGovernanceRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def sync_governance( self, params: SyncGovernanceRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync governance agents for accounts. Override this to handle governance agent registration. """ return self._not_supported("sync_governance")Sync governance agents for accounts.
Override this to handle governance agent registration.
async def sync_plans(self,
params: SyncPlansRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def sync_plans( self, params: SyncPlansRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Sync campaign governance plans. Override this in GovernanceHandler subclasses. """ return self._not_supported("sync_plans")Sync campaign governance plans.
Override this in GovernanceHandler subclasses.
async def update_collection_list(self,
params: UpdateCollectionListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def update_collection_list( self, params: UpdateCollectionListRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Update a collection list. Override this in GovernanceHandler subclasses. """ return self._not_supported("update_collection_list")Update a collection list.
Override this in GovernanceHandler subclasses.
async def update_content_standards(self,
params: UpdateContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def update_content_standards( self, params: UpdateContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Update content standards configuration. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("update_content_standards")Update content standards configuration.
Override this in ContentStandardsHandler subclasses.
async def update_media_buy(self,
params: UpdateMediaBuyRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def update_media_buy( self, params: UpdateMediaBuyRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Update a media buy. Override this to handle media buy updates. """ return self._not_supported("update_media_buy")Update a media buy.
Override this to handle media buy updates.
async def update_property_list(self,
params: UpdatePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def update_property_list( self, params: UpdatePropertyListRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Update a property list. Override this in GovernanceHandler subclasses. """ return self._not_supported("update_property_list")Update a property list.
Override this in GovernanceHandler subclasses.
async def update_rights(self,
params: UpdateRightsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def update_rights( self, params: UpdateRightsRequest | dict[str, Any], context: ToolContext | None = None ) -> Any: """Update terms of an existing rights acquisition. Override this in BrandHandler subclasses. Partial update: the request carries ``rights_id`` plus any subset of the mutable fields (``end_date``, ``impression_cap``, ``pricing_option_id``, ``paused``). Seller responsibilities you own when implementing this: * Reject updates on expired or revoked acquisitions with an appropriate error code — do not partial-commit. * Reject ``pricing_option_id`` swaps to incompatible options — the new option's terms must be a strict superset of the original. * Apply all accepted fields atomically — callers should never observe a half-applied update on failure. """ return self._not_supported("update_rights")Update terms of an existing rights acquisition.
Override this in BrandHandler subclasses. Partial update: the request carries
rights_idplus any subset of the mutable fields (end_date,impression_cap,pricing_option_id,paused).Seller responsibilities you own when implementing this:
- Reject updates on expired or revoked acquisitions with an appropriate error code — do not partial-commit.
- Reject
pricing_option_idswaps to incompatible options — the new option's terms must be a strict superset of the original. - Apply all accepted fields atomically — callers should never observe a half-applied update on failure.
async def validate_content_delivery(self,
params: ValidateContentDeliveryRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any-
Expand source code
async def validate_content_delivery( self, params: ValidateContentDeliveryRequest | dict[str, Any], context: ToolContext | None = None, ) -> Any: """Validate content delivery against standards. Override this in ContentStandardsHandler subclasses. """ return self._not_supported("validate_content_delivery")Validate content delivery against standards.
Override this in ContentStandardsHandler subclasses.
class ADCPServerBuilder (name: str, *, version: str = '1.0.0')-
Expand source code
class ADCPServerBuilder: """Declarative server builder using decorators. Use ``adcp_server()`` to create an instance, then register handlers with decorators. The builder can be passed directly to ``serve()``. Example:: server = adcp_server("my-seller") @server.get_products async def get_products(params, context=None): return products_response(MY_PRODUCTS) serve(server, name="my-seller") """ def __init__(self, name: str, *, version: str = "1.0.0") -> None: self.name = name self.version = version self._handlers: dict[str, Callable[..., Any]] = {} def __getattr__(self, task_name: str) -> Callable[..., Any]: """Return a decorator that registers a handler for the given task.""" if task_name.startswith("_"): raise AttributeError(task_name) def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: if ( task_name not in HANDLER_TO_DOMAIN and task_name != "get_adcp_capabilities" ): raise ValueError( f"'{task_name}' is not a known ADCP task. " f"Check for typos." ) self._handlers[task_name] = fn return fn return decorator def _detect_domains(self) -> list[str]: """Detect which ADCP domains the registered handlers cover.""" domains: set[str] = set() for handler_name in self._handlers: domain = HANDLER_TO_DOMAIN.get(handler_name) if domain: domains.add(domain) return sorted(domains) def build_handler(self) -> ADCPHandler: """Build an ADCPHandler from registered decorators. If ``get_adcp_capabilities`` is not registered, it will be auto-generated from the detected domains. """ handlers = dict(self._handlers) # Auto-generate capabilities if not provided if "get_adcp_capabilities" not in handlers: domains = self._detect_domains() if domains: from adcp.server.responses import capabilities_response async def auto_capabilities( params: Any, context: Any = None ) -> dict[str, Any]: return capabilities_response(domains) handlers["get_adcp_capabilities"] = auto_capabilities # Create a dynamic subclass class DynamicHandler(ADCPHandler): pass for task_name, fn in handlers.items(): # Wrap standalone functions to accept self async def _bound_method( self: Any, params: Any, context: Any = None, _fn: Callable[..., Any] = fn, ) -> Any: return await _fn(params, context) setattr(DynamicHandler, task_name, _bound_method) return DynamicHandler()Declarative server builder using decorators.
Use
adcp_server()to create an instance, then register handlers with decorators. The builder can be passed directly toserve().Example::
server = adcp_server("my-seller") @server.get_products async def get_products(params, context=None): return products_response(MY_PRODUCTS) serve(server, name="my-seller")Methods
def build_handler(self) ‑> ADCPHandler-
Expand source code
def build_handler(self) -> ADCPHandler: """Build an ADCPHandler from registered decorators. If ``get_adcp_capabilities`` is not registered, it will be auto-generated from the detected domains. """ handlers = dict(self._handlers) # Auto-generate capabilities if not provided if "get_adcp_capabilities" not in handlers: domains = self._detect_domains() if domains: from adcp.server.responses import capabilities_response async def auto_capabilities( params: Any, context: Any = None ) -> dict[str, Any]: return capabilities_response(domains) handlers["get_adcp_capabilities"] = auto_capabilities # Create a dynamic subclass class DynamicHandler(ADCPHandler): pass for task_name, fn in handlers.items(): # Wrap standalone functions to accept self async def _bound_method( self: Any, params: Any, context: Any = None, _fn: Callable[..., Any] = fn, ) -> Any: return await _fn(params, context) setattr(DynamicHandler, task_name, _bound_method) return DynamicHandler()Build an ADCPHandler from registered decorators.
If
get_adcp_capabilitiesis not registered, it will be auto-generated from the detected domains.
class AccountError (code: str, message: str | None = None, *, suggestion: str | None = None)-
Expand source code
class AccountError(Exception): """Raised by account resolvers to indicate a specific account error. Use this in your resolver to return structured errors for cases beyond simple "not found":: async def my_resolver(ref): account = db.find(ref) if not account: return None # auto-returns ACCOUNT_NOT_FOUND if account.status == "suspended": raise AccountError("ACCOUNT_SUSPENDED", "Account is suspended") if account.status == "payment_required": raise AccountError("ACCOUNT_PAYMENT_REQUIRED", suggestion="Update payment method at https://...") return account """ def __init__( self, code: str, message: str | None = None, *, suggestion: str | None = None, ): self.code = code self.error_message = message self.suggestion = suggestion super().__init__(message or code)Raised by account resolvers to indicate a specific account error.
Use this in your resolver to return structured errors for cases beyond simple "not found"::
async def my_resolver(ref): account = db.find(ref) if not account: return None # auto-returns ACCOUNT_NOT_FOUND if account.status == "suspended": raise AccountError("ACCOUNT_SUSPENDED", "Account is suspended") if account.status == "payment_required": raise AccountError("ACCOUNT_PAYMENT_REQUIRED", suggestion="Update payment method at <https://...">) return accountAncestors
- builtins.Exception
- builtins.BaseException
class BrandHandler-
Expand source code
class BrandHandler(ADCPHandler): """Handler for brand rights operations. Subclass this to implement brand identity and rights management. Only brand rights tools will be exposed via MCP. Example: class MyBrandAgent(BrandHandler): async def get_brand_identity(self, params, context=None): # Implement brand identity lookup pass """ _agent_type = "Brand agents"Handler for brand rights operations.
Subclass this to implement brand identity and rights management. Only brand rights tools will be exposed via MCP.
Example
class MyBrandAgent(BrandHandler): async def get_brand_identity(self, params, context=None): # Implement brand identity lookup pass
Ancestors
- ADCPHandler
- abc.ABC
Inherited members
ADCPHandler:acquire_rightsactivate_signalbuild_creativecalibrate_contentcheck_governancecomply_test_controllercontext_matchcreate_collection_listcreate_content_standardscreate_media_buycreate_property_listdelete_collection_listdelete_property_listget_account_financialsget_adcp_capabilitiesget_brand_identityget_collection_listget_content_standardsget_creative_deliveryget_creative_featuresget_media_buy_artifactsget_media_buy_deliveryget_media_buysget_plan_audit_logsget_productsget_property_listget_rightsget_signalsidentity_matchlist_accountslist_collection_listslist_content_standardslist_creative_formatslist_creativeslist_property_listslog_eventpreview_creativeprovide_performance_feedbackreport_plan_outcomereport_usagesi_get_offeringsi_initiate_sessionsi_send_messagesi_terminate_sessionsync_accountssync_audiencessync_catalogssync_creativessync_event_sourcessync_governancesync_plansupdate_collection_listupdate_content_standardsupdate_media_buyupdate_property_listupdate_rightsvalidate_content_delivery
class ComplianceHandler-
Expand source code
class ComplianceHandler(ADCPHandler): """Handler for compliance test operations. Subclass this to implement compliance sandbox testing. Only compliance tools will be exposed via MCP. Example: class MyComplianceAgent(ComplianceHandler): async def comply_test_controller(self, params, context=None): # Implement test controller pass """ _agent_type = "Compliance agents"Handler for compliance test operations.
Subclass this to implement compliance sandbox testing. Only compliance tools will be exposed via MCP.
Example
class MyComplianceAgent(ComplianceHandler): async def comply_test_controller(self, params, context=None): # Implement test controller pass
Ancestors
- ADCPHandler
- abc.ABC
Inherited members
ADCPHandler:acquire_rightsactivate_signalbuild_creativecalibrate_contentcheck_governancecomply_test_controllercontext_matchcreate_collection_listcreate_content_standardscreate_media_buycreate_property_listdelete_collection_listdelete_property_listget_account_financialsget_adcp_capabilitiesget_brand_identityget_collection_listget_content_standardsget_creative_deliveryget_creative_featuresget_media_buy_artifactsget_media_buy_deliveryget_media_buysget_plan_audit_logsget_productsget_property_listget_rightsget_signalsidentity_matchlist_accountslist_collection_listslist_content_standardslist_creative_formatslist_creativeslist_property_listslog_eventpreview_creativeprovide_performance_feedbackreport_plan_outcomereport_usagesi_get_offeringsi_initiate_sessionsi_send_messagesi_terminate_sessionsync_accountssync_audiencessync_catalogssync_creativessync_event_sourcessync_governancesync_plansupdate_collection_listupdate_content_standardsupdate_media_buyupdate_property_listupdate_rightsvalidate_content_delivery
class ContentStandardsHandler-
Expand source code
class ContentStandardsHandler(ADCPHandler): """Handler for Content Standards protocol. Subclass this to implement a Content Standards agent. All Content Standards operations must be implemented via the handle_* methods. The public methods (create_content_standards, etc.) handle validation and error handling automatically. Non-Content-Standards operations (get_products, create_media_buy, etc.) return 'not supported' via the base class. Example: class MyContentStandardsHandler(ContentStandardsHandler): async def handle_create_content_standards( self, request: CreateContentStandardsRequest, context: ToolContext | None = None ) -> CreateContentStandardsResponse: # Your implementation return CreateContentStandardsResponse(...) """ _agent_type: str = "Content Standards agents" # ======================================================================== # Content Standards Operations - Override base class with validation # ======================================================================== async def create_content_standards( self, params: CreateContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> CreateContentStandardsResponse | NotImplementedResponse: """Create content standards configuration. Validates params and delegates to handle_create_content_standards. """ try: request = CreateContentStandardsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_create_content_standards(request, context) async def get_content_standards( self, params: GetContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetContentStandardsResponse | NotImplementedResponse: """Get content standards configuration. Validates params and delegates to handle_get_content_standards. """ try: request = GetContentStandardsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_content_standards(request, context) async def list_content_standards( self, params: ListContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> ListContentStandardsResponse | NotImplementedResponse: """List content standards configurations. Validates params and delegates to handle_list_content_standards. """ try: request = ListContentStandardsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_list_content_standards(request, context) async def update_content_standards( self, params: UpdateContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> UpdateContentStandardsResponse | NotImplementedResponse: """Update content standards configuration. Validates params and delegates to handle_update_content_standards. """ try: request = UpdateContentStandardsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_update_content_standards(request, context) async def calibrate_content( self, params: CalibrateContentRequest | dict[str, Any], context: ToolContext | None = None, ) -> CalibrateContentResponse | NotImplementedResponse: """Calibrate content against standards. Validates params and delegates to handle_calibrate_content. """ try: request = CalibrateContentRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_calibrate_content(request, context) async def validate_content_delivery( self, params: ValidateContentDeliveryRequest | dict[str, Any], context: ToolContext | None = None, ) -> ValidateContentDeliveryResponse | NotImplementedResponse: """Validate content delivery against standards. Validates params and delegates to handle_validate_content_delivery. """ try: request = ValidateContentDeliveryRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_validate_content_delivery(request, context) async def get_media_buy_artifacts( self, params: GetMediaBuyArtifactsRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetMediaBuyArtifactsResponse | NotImplementedResponse: """Get artifacts associated with a media buy. Validates params and delegates to handle_get_media_buy_artifacts. """ try: request = GetMediaBuyArtifactsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_media_buy_artifacts(request, context) # ======================================================================== # Abstract handlers - Implement these in subclasses # ======================================================================== @abstractmethod async def handle_create_content_standards( self, request: CreateContentStandardsRequest, context: ToolContext | None = None, ) -> CreateContentStandardsResponse: """Handle create content standards request.""" ... @abstractmethod async def handle_get_content_standards( self, request: GetContentStandardsRequest, context: ToolContext | None = None, ) -> GetContentStandardsResponse: """Handle get content standards request.""" ... @abstractmethod async def handle_list_content_standards( self, request: ListContentStandardsRequest, context: ToolContext | None = None, ) -> ListContentStandardsResponse: """Handle list content standards request.""" ... @abstractmethod async def handle_update_content_standards( self, request: UpdateContentStandardsRequest, context: ToolContext | None = None, ) -> UpdateContentStandardsResponse: """Handle update content standards request.""" ... @abstractmethod async def handle_calibrate_content( self, request: CalibrateContentRequest, context: ToolContext | None = None, ) -> CalibrateContentResponse: """Handle calibrate content request.""" ... @abstractmethod async def handle_validate_content_delivery( self, request: ValidateContentDeliveryRequest, context: ToolContext | None = None, ) -> ValidateContentDeliveryResponse: """Handle validate content delivery request.""" ... @abstractmethod async def handle_get_media_buy_artifacts( self, request: GetMediaBuyArtifactsRequest, context: ToolContext | None = None, ) -> GetMediaBuyArtifactsResponse: """Handle get media buy artifacts request.""" ...Handler for Content Standards protocol.
Subclass this to implement a Content Standards agent. All Content Standards operations must be implemented via the handle_* methods. The public methods (create_content_standards, etc.) handle validation and error handling automatically.
Non-Content-Standards operations (get_products, create_media_buy, etc.) return 'not supported' via the base class.
Example
class MyContentStandardsHandler(ContentStandardsHandler): async def handle_create_content_standards( self, request: CreateContentStandardsRequest, context: ToolContext | None = None ) -> CreateContentStandardsResponse: # Your implementation return CreateContentStandardsResponse(…)
Ancestors
- ADCPHandler
- abc.ABC
Methods
async def calibrate_content(self,
params: CalibrateContentRequest | dict[str, Any],
context: ToolContext | None = None) ‑> CalibrateContentResponse1 | CalibrateContentResponse2 | NotImplementedResponse-
Expand source code
async def calibrate_content( self, params: CalibrateContentRequest | dict[str, Any], context: ToolContext | None = None, ) -> CalibrateContentResponse | NotImplementedResponse: """Calibrate content against standards. Validates params and delegates to handle_calibrate_content. """ try: request = CalibrateContentRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_calibrate_content(request, context)Calibrate content against standards.
Validates params and delegates to handle_calibrate_content.
async def create_content_standards(self,
params: CreateContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> CreateContentStandardsResponse1 | CreateContentStandardsResponse2 | NotImplementedResponse-
Expand source code
async def create_content_standards( self, params: CreateContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> CreateContentStandardsResponse | NotImplementedResponse: """Create content standards configuration. Validates params and delegates to handle_create_content_standards. """ try: request = CreateContentStandardsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_create_content_standards(request, context)Create content standards configuration.
Validates params and delegates to handle_create_content_standards.
async def get_content_standards(self,
params: GetContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetContentStandardsResponse1 | GetContentStandardsResponse2 | NotImplementedResponse-
Expand source code
async def get_content_standards( self, params: GetContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetContentStandardsResponse | NotImplementedResponse: """Get content standards configuration. Validates params and delegates to handle_get_content_standards. """ try: request = GetContentStandardsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_content_standards(request, context)Get content standards configuration.
Validates params and delegates to handle_get_content_standards.
async def get_media_buy_artifacts(self,
params: GetMediaBuyArtifactsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetMediaBuyArtifactsResponse1 | GetMediaBuyArtifactsResponse2 | NotImplementedResponse-
Expand source code
async def get_media_buy_artifacts( self, params: GetMediaBuyArtifactsRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetMediaBuyArtifactsResponse | NotImplementedResponse: """Get artifacts associated with a media buy. Validates params and delegates to handle_get_media_buy_artifacts. """ try: request = GetMediaBuyArtifactsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_media_buy_artifacts(request, context)Get artifacts associated with a media buy.
Validates params and delegates to handle_get_media_buy_artifacts.
async def handle_calibrate_content(self,
request: CalibrateContentRequest,
context: ToolContext | None = None) ‑> CalibrateContentResponse1 | CalibrateContentResponse2-
Expand source code
@abstractmethod async def handle_calibrate_content( self, request: CalibrateContentRequest, context: ToolContext | None = None, ) -> CalibrateContentResponse: """Handle calibrate content request.""" ...Handle calibrate content request.
async def handle_create_content_standards(self,
request: CreateContentStandardsRequest,
context: ToolContext | None = None) ‑> CreateContentStandardsResponse1 | CreateContentStandardsResponse2-
Expand source code
@abstractmethod async def handle_create_content_standards( self, request: CreateContentStandardsRequest, context: ToolContext | None = None, ) -> CreateContentStandardsResponse: """Handle create content standards request.""" ...Handle create content standards request.
async def handle_get_content_standards(self,
request: GetContentStandardsRequest,
context: ToolContext | None = None) ‑> GetContentStandardsResponse1 | GetContentStandardsResponse2-
Expand source code
@abstractmethod async def handle_get_content_standards( self, request: GetContentStandardsRequest, context: ToolContext | None = None, ) -> GetContentStandardsResponse: """Handle get content standards request.""" ...Handle get content standards request.
async def handle_get_media_buy_artifacts(self,
request: GetMediaBuyArtifactsRequest,
context: ToolContext | None = None) ‑> GetMediaBuyArtifactsResponse1 | GetMediaBuyArtifactsResponse2-
Expand source code
@abstractmethod async def handle_get_media_buy_artifacts( self, request: GetMediaBuyArtifactsRequest, context: ToolContext | None = None, ) -> GetMediaBuyArtifactsResponse: """Handle get media buy artifacts request.""" ...Handle get media buy artifacts request.
async def handle_list_content_standards(self,
request: ListContentStandardsRequest,
context: ToolContext | None = None) ‑> ListContentStandardsResponse1 | ListContentStandardsResponse2-
Expand source code
@abstractmethod async def handle_list_content_standards( self, request: ListContentStandardsRequest, context: ToolContext | None = None, ) -> ListContentStandardsResponse: """Handle list content standards request.""" ...Handle list content standards request.
async def handle_update_content_standards(self,
request: UpdateContentStandardsRequest,
context: ToolContext | None = None) ‑> UpdateContentStandardsResponse1 | UpdateContentStandardsResponse2-
Expand source code
@abstractmethod async def handle_update_content_standards( self, request: UpdateContentStandardsRequest, context: ToolContext | None = None, ) -> UpdateContentStandardsResponse: """Handle update content standards request.""" ...Handle update content standards request.
async def handle_validate_content_delivery(self,
request: ValidateContentDeliveryRequest,
context: ToolContext | None = None) ‑> ValidateContentDeliveryResponse1 | ValidateContentDeliveryResponse2-
Expand source code
@abstractmethod async def handle_validate_content_delivery( self, request: ValidateContentDeliveryRequest, context: ToolContext | None = None, ) -> ValidateContentDeliveryResponse: """Handle validate content delivery request.""" ...Handle validate content delivery request.
async def list_content_standards(self,
params: ListContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> ListContentStandardsResponse1 | ListContentStandardsResponse2 | NotImplementedResponse-
Expand source code
async def list_content_standards( self, params: ListContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> ListContentStandardsResponse | NotImplementedResponse: """List content standards configurations. Validates params and delegates to handle_list_content_standards. """ try: request = ListContentStandardsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_list_content_standards(request, context)List content standards configurations.
Validates params and delegates to handle_list_content_standards.
async def update_content_standards(self,
params: UpdateContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> UpdateContentStandardsResponse1 | UpdateContentStandardsResponse2 | NotImplementedResponse-
Expand source code
async def update_content_standards( self, params: UpdateContentStandardsRequest | dict[str, Any], context: ToolContext | None = None, ) -> UpdateContentStandardsResponse | NotImplementedResponse: """Update content standards configuration. Validates params and delegates to handle_update_content_standards. """ try: request = UpdateContentStandardsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_update_content_standards(request, context)Update content standards configuration.
Validates params and delegates to handle_update_content_standards.
async def validate_content_delivery(self,
params: ValidateContentDeliveryRequest | dict[str, Any],
context: ToolContext | None = None) ‑> ValidateContentDeliveryResponse1 | ValidateContentDeliveryResponse2 | NotImplementedResponse-
Expand source code
async def validate_content_delivery( self, params: ValidateContentDeliveryRequest | dict[str, Any], context: ToolContext | None = None, ) -> ValidateContentDeliveryResponse | NotImplementedResponse: """Validate content delivery against standards. Validates params and delegates to handle_validate_content_delivery. """ try: request = ValidateContentDeliveryRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_validate_content_delivery(request, context)Validate content delivery against standards.
Validates params and delegates to handle_validate_content_delivery.
Inherited members
ADCPHandler:acquire_rightsactivate_signalbuild_creativecheck_governancecomply_test_controllercontext_matchcreate_collection_listcreate_media_buycreate_property_listdelete_collection_listdelete_property_listget_account_financialsget_adcp_capabilitiesget_brand_identityget_collection_listget_creative_deliveryget_creative_featuresget_media_buy_deliveryget_media_buysget_plan_audit_logsget_productsget_property_listget_rightsget_signalsidentity_matchlist_accountslist_collection_listslist_creative_formatslist_creativeslist_property_listslog_eventpreview_creativeprovide_performance_feedbackreport_plan_outcomereport_usagesi_get_offeringsi_initiate_sessionsi_send_messagesi_terminate_sessionsync_accountssync_audiencessync_catalogssync_creativessync_event_sourcessync_governancesync_plansupdate_collection_listupdate_media_buyupdate_property_listupdate_rights
class GovernanceHandler-
Expand source code
class GovernanceHandler(ADCPHandler): """Handler for Governance protocol (Property Lists). Subclass this to implement a Governance agent that manages property lists for brand safety, compliance scoring, and quality filtering. All property list operations must be implemented via the handle_* methods. The public methods (create_property_list, etc.) handle validation and error handling automatically. Non-governance operations (get_products, create_media_buy, etc.) return 'not supported' via the base class. Example: class MyGovernanceHandler(GovernanceHandler): async def handle_create_property_list( self, request: CreatePropertyListRequest, context: ToolContext | None = None ) -> CreatePropertyListResponse: # Store the list definition list_id = generate_id() # ... return CreatePropertyListResponse(list=PropertyList(...)) """ _agent_type: str = "Governance agents" # ======================================================================== # Governance Operations - Override base class with validation # ======================================================================== async def get_creative_features( self, params: GetCreativeFeaturesRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetCreativeFeaturesResponse | NotImplementedResponse: """Evaluate governance features for a creative manifest.""" try: request = GetCreativeFeaturesRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_creative_features(request, context) async def sync_plans( self, params: SyncPlansRequest | dict[str, Any], context: ToolContext | None = None, ) -> SyncPlansResponse | NotImplementedResponse: """Sync campaign governance plans to the agent.""" try: request = SyncPlansRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_sync_plans(request, context) async def check_governance( self, params: CheckGovernanceRequest | dict[str, Any], context: ToolContext | None = None, ) -> CheckGovernanceResponse | NotImplementedResponse: """Check whether a proposed or committed action complies with plan governance.""" try: request = CheckGovernanceRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_check_governance(request, context) async def report_plan_outcome( self, params: ReportPlanOutcomeRequest | dict[str, Any], context: ToolContext | None = None, ) -> ReportPlanOutcomeResponse | NotImplementedResponse: """Report the outcome of a previously governed action.""" try: request = ReportPlanOutcomeRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_report_plan_outcome(request, context) async def get_plan_audit_logs( self, params: GetPlanAuditLogsRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetPlanAuditLogsResponse | NotImplementedResponse: """Retrieve governance audit logs for one or more plans.""" try: request = GetPlanAuditLogsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_plan_audit_logs(request, context) async def create_property_list( self, params: CreatePropertyListRequest | dict[str, Any], context: ToolContext | None = None, ) -> CreatePropertyListResponse | NotImplementedResponse: """Create a property list for governance filtering. Validates params and delegates to handle_create_property_list. """ try: request = CreatePropertyListRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_create_property_list(request, context) async def get_property_list( self, params: GetPropertyListRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetPropertyListResponse | NotImplementedResponse: """Get a property list with optional resolution. Validates params and delegates to handle_get_property_list. """ try: request = GetPropertyListRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_property_list(request, context) async def list_property_lists( self, params: ListPropertyListsRequest | dict[str, Any], context: ToolContext | None = None, ) -> ListPropertyListsResponse | NotImplementedResponse: """List property lists. Validates params and delegates to handle_list_property_lists. """ try: request = ListPropertyListsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_list_property_lists(request, context) async def update_property_list( self, params: UpdatePropertyListRequest | dict[str, Any], context: ToolContext | None = None, ) -> UpdatePropertyListResponse | NotImplementedResponse: """Update a property list. Validates params and delegates to handle_update_property_list. """ try: request = UpdatePropertyListRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_update_property_list(request, context) async def delete_property_list( self, params: DeletePropertyListRequest | dict[str, Any], context: ToolContext | None = None, ) -> DeletePropertyListResponse | NotImplementedResponse: """Delete a property list. Validates params and delegates to handle_delete_property_list. """ try: request = DeletePropertyListRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_delete_property_list(request, context) # ======================================================================== # Abstract handlers - Implement these in subclasses # ======================================================================== @abstractmethod async def handle_get_creative_features( self, request: GetCreativeFeaturesRequest, context: ToolContext | None = None, ) -> GetCreativeFeaturesResponse: """Handle creative feature evaluation.""" ... @abstractmethod async def handle_sync_plans( self, request: SyncPlansRequest, context: ToolContext | None = None, ) -> SyncPlansResponse: """Handle campaign governance plan sync.""" ... @abstractmethod async def handle_check_governance( self, request: CheckGovernanceRequest, context: ToolContext | None = None, ) -> CheckGovernanceResponse: """Handle a governance check request.""" ... @abstractmethod async def handle_report_plan_outcome( self, request: ReportPlanOutcomeRequest, context: ToolContext | None = None, ) -> ReportPlanOutcomeResponse: """Handle reporting of a governed action outcome.""" ... @abstractmethod async def handle_get_plan_audit_logs( self, request: GetPlanAuditLogsRequest, context: ToolContext | None = None, ) -> GetPlanAuditLogsResponse: """Handle retrieval of governance audit logs.""" ... @abstractmethod async def handle_create_property_list( self, request: CreatePropertyListRequest, context: ToolContext | None = None, ) -> CreatePropertyListResponse: """Handle create property list request.""" ... @abstractmethod async def handle_get_property_list( self, request: GetPropertyListRequest, context: ToolContext | None = None, ) -> GetPropertyListResponse: """Handle get property list request.""" ... @abstractmethod async def handle_list_property_lists( self, request: ListPropertyListsRequest, context: ToolContext | None = None, ) -> ListPropertyListsResponse: """Handle list property lists request.""" ... @abstractmethod async def handle_update_property_list( self, request: UpdatePropertyListRequest, context: ToolContext | None = None, ) -> UpdatePropertyListResponse: """Handle update property list request.""" ... @abstractmethod async def handle_delete_property_list( self, request: DeletePropertyListRequest, context: ToolContext | None = None, ) -> DeletePropertyListResponse: """Handle delete property list request.""" ...Handler for Governance protocol (Property Lists).
Subclass this to implement a Governance agent that manages property lists for brand safety, compliance scoring, and quality filtering.
All property list operations must be implemented via the handle_* methods. The public methods (create_property_list, etc.) handle validation and error handling automatically.
Non-governance operations (get_products, create_media_buy, etc.) return 'not supported' via the base class.
Example
class MyGovernanceHandler(GovernanceHandler): async def handle_create_property_list( self, request: CreatePropertyListRequest, context: ToolContext | None = None ) -> CreatePropertyListResponse: # Store the list definition list_id = generate_id() # … return CreatePropertyListResponse(list=PropertyList(…))
Ancestors
- ADCPHandler
- abc.ABC
Methods
async def check_governance(self,
params: CheckGovernanceRequest | dict[str, Any],
context: ToolContext | None = None) ‑> CheckGovernanceResponse | NotImplementedResponse-
Expand source code
async def check_governance( self, params: CheckGovernanceRequest | dict[str, Any], context: ToolContext | None = None, ) -> CheckGovernanceResponse | NotImplementedResponse: """Check whether a proposed or committed action complies with plan governance.""" try: request = CheckGovernanceRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_check_governance(request, context)Check whether a proposed or committed action complies with plan governance.
async def create_property_list(self,
params: CreatePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> CreatePropertyListResponse | NotImplementedResponse-
Expand source code
async def create_property_list( self, params: CreatePropertyListRequest | dict[str, Any], context: ToolContext | None = None, ) -> CreatePropertyListResponse | NotImplementedResponse: """Create a property list for governance filtering. Validates params and delegates to handle_create_property_list. """ try: request = CreatePropertyListRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_create_property_list(request, context)Create a property list for governance filtering.
Validates params and delegates to handle_create_property_list.
async def delete_property_list(self,
params: DeletePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> DeletePropertyListResponse | NotImplementedResponse-
Expand source code
async def delete_property_list( self, params: DeletePropertyListRequest | dict[str, Any], context: ToolContext | None = None, ) -> DeletePropertyListResponse | NotImplementedResponse: """Delete a property list. Validates params and delegates to handle_delete_property_list. """ try: request = DeletePropertyListRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_delete_property_list(request, context)Delete a property list.
Validates params and delegates to handle_delete_property_list.
async def get_creative_features(self,
params: GetCreativeFeaturesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetCreativeFeaturesResponse1 | GetCreativeFeaturesResponse2 | NotImplementedResponse-
Expand source code
async def get_creative_features( self, params: GetCreativeFeaturesRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetCreativeFeaturesResponse | NotImplementedResponse: """Evaluate governance features for a creative manifest.""" try: request = GetCreativeFeaturesRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_creative_features(request, context)Evaluate governance features for a creative manifest.
async def get_plan_audit_logs(self,
params: GetPlanAuditLogsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetPlanAuditLogsResponse | NotImplementedResponse-
Expand source code
async def get_plan_audit_logs( self, params: GetPlanAuditLogsRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetPlanAuditLogsResponse | NotImplementedResponse: """Retrieve governance audit logs for one or more plans.""" try: request = GetPlanAuditLogsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_plan_audit_logs(request, context)Retrieve governance audit logs for one or more plans.
async def get_property_list(self,
params: GetPropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetPropertyListResponse | NotImplementedResponse-
Expand source code
async def get_property_list( self, params: GetPropertyListRequest | dict[str, Any], context: ToolContext | None = None, ) -> GetPropertyListResponse | NotImplementedResponse: """Get a property list with optional resolution. Validates params and delegates to handle_get_property_list. """ try: request = GetPropertyListRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_get_property_list(request, context)Get a property list with optional resolution.
Validates params and delegates to handle_get_property_list.
async def handle_check_governance(self,
request: CheckGovernanceRequest,
context: ToolContext | None = None) ‑> CheckGovernanceResponse-
Expand source code
@abstractmethod async def handle_check_governance( self, request: CheckGovernanceRequest, context: ToolContext | None = None, ) -> CheckGovernanceResponse: """Handle a governance check request.""" ...Handle a governance check request.
async def handle_create_property_list(self,
request: CreatePropertyListRequest,
context: ToolContext | None = None) ‑> CreatePropertyListResponse-
Expand source code
@abstractmethod async def handle_create_property_list( self, request: CreatePropertyListRequest, context: ToolContext | None = None, ) -> CreatePropertyListResponse: """Handle create property list request.""" ...Handle create property list request.
async def handle_delete_property_list(self,
request: DeletePropertyListRequest,
context: ToolContext | None = None) ‑> DeletePropertyListResponse-
Expand source code
@abstractmethod async def handle_delete_property_list( self, request: DeletePropertyListRequest, context: ToolContext | None = None, ) -> DeletePropertyListResponse: """Handle delete property list request.""" ...Handle delete property list request.
async def handle_get_creative_features(self,
request: GetCreativeFeaturesRequest,
context: ToolContext | None = None) ‑> GetCreativeFeaturesResponse1 | GetCreativeFeaturesResponse2-
Expand source code
@abstractmethod async def handle_get_creative_features( self, request: GetCreativeFeaturesRequest, context: ToolContext | None = None, ) -> GetCreativeFeaturesResponse: """Handle creative feature evaluation.""" ...Handle creative feature evaluation.
async def handle_get_plan_audit_logs(self,
request: GetPlanAuditLogsRequest,
context: ToolContext | None = None) ‑> GetPlanAuditLogsResponse-
Expand source code
@abstractmethod async def handle_get_plan_audit_logs( self, request: GetPlanAuditLogsRequest, context: ToolContext | None = None, ) -> GetPlanAuditLogsResponse: """Handle retrieval of governance audit logs.""" ...Handle retrieval of governance audit logs.
async def handle_get_property_list(self,
request: GetPropertyListRequest,
context: ToolContext | None = None) ‑> GetPropertyListResponse-
Expand source code
@abstractmethod async def handle_get_property_list( self, request: GetPropertyListRequest, context: ToolContext | None = None, ) -> GetPropertyListResponse: """Handle get property list request.""" ...Handle get property list request.
async def handle_list_property_lists(self,
request: ListPropertyListsRequest,
context: ToolContext | None = None) ‑> ListPropertyListsResponse-
Expand source code
@abstractmethod async def handle_list_property_lists( self, request: ListPropertyListsRequest, context: ToolContext | None = None, ) -> ListPropertyListsResponse: """Handle list property lists request.""" ...Handle list property lists request.
async def handle_report_plan_outcome(self,
request: ReportPlanOutcomeRequest,
context: ToolContext | None = None) ‑> ReportPlanOutcomeResponse-
Expand source code
@abstractmethod async def handle_report_plan_outcome( self, request: ReportPlanOutcomeRequest, context: ToolContext | None = None, ) -> ReportPlanOutcomeResponse: """Handle reporting of a governed action outcome.""" ...Handle reporting of a governed action outcome.
async def handle_sync_plans(self,
request: SyncPlansRequest,
context: ToolContext | None = None) ‑> SyncPlansResponse-
Expand source code
@abstractmethod async def handle_sync_plans( self, request: SyncPlansRequest, context: ToolContext | None = None, ) -> SyncPlansResponse: """Handle campaign governance plan sync.""" ...Handle campaign governance plan sync.
async def handle_update_property_list(self,
request: UpdatePropertyListRequest,
context: ToolContext | None = None) ‑> UpdatePropertyListResponse-
Expand source code
@abstractmethod async def handle_update_property_list( self, request: UpdatePropertyListRequest, context: ToolContext | None = None, ) -> UpdatePropertyListResponse: """Handle update property list request.""" ...Handle update property list request.
async def list_property_lists(self,
params: ListPropertyListsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> ListPropertyListsResponse | NotImplementedResponse-
Expand source code
async def list_property_lists( self, params: ListPropertyListsRequest | dict[str, Any], context: ToolContext | None = None, ) -> ListPropertyListsResponse | NotImplementedResponse: """List property lists. Validates params and delegates to handle_list_property_lists. """ try: request = ListPropertyListsRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_list_property_lists(request, context)List property lists.
Validates params and delegates to handle_list_property_lists.
async def report_plan_outcome(self,
params: ReportPlanOutcomeRequest | dict[str, Any],
context: ToolContext | None = None) ‑> ReportPlanOutcomeResponse | NotImplementedResponse-
Expand source code
async def report_plan_outcome( self, params: ReportPlanOutcomeRequest | dict[str, Any], context: ToolContext | None = None, ) -> ReportPlanOutcomeResponse | NotImplementedResponse: """Report the outcome of a previously governed action.""" try: request = ReportPlanOutcomeRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_report_plan_outcome(request, context)Report the outcome of a previously governed action.
async def sync_plans(self,
params: SyncPlansRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SyncPlansResponse | NotImplementedResponse-
Expand source code
async def sync_plans( self, params: SyncPlansRequest | dict[str, Any], context: ToolContext | None = None, ) -> SyncPlansResponse | NotImplementedResponse: """Sync campaign governance plans to the agent.""" try: request = SyncPlansRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_sync_plans(request, context)Sync campaign governance plans to the agent.
async def update_property_list(self,
params: UpdatePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> UpdatePropertyListResponse | NotImplementedResponse-
Expand source code
async def update_property_list( self, params: UpdatePropertyListRequest | dict[str, Any], context: ToolContext | None = None, ) -> UpdatePropertyListResponse | NotImplementedResponse: """Update a property list. Validates params and delegates to handle_update_property_list. """ try: request = UpdatePropertyListRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_update_property_list(request, context)Update a property list.
Validates params and delegates to handle_update_property_list.
Inherited members
ADCPHandler:acquire_rightsactivate_signalbuild_creativecalibrate_contentcomply_test_controllercontext_matchcreate_collection_listcreate_content_standardscreate_media_buydelete_collection_listget_account_financialsget_adcp_capabilitiesget_brand_identityget_collection_listget_content_standardsget_creative_deliveryget_media_buy_artifactsget_media_buy_deliveryget_media_buysget_productsget_rightsget_signalsidentity_matchlist_accountslist_collection_listslist_content_standardslist_creative_formatslist_creativeslog_eventpreview_creativeprovide_performance_feedbackreport_usagesi_get_offeringsi_initiate_sessionsi_send_messagesi_terminate_sessionsync_accountssync_audiencessync_catalogssync_creativessync_event_sourcessync_governanceupdate_collection_listupdate_content_standardsupdate_media_buyupdate_rightsvalidate_content_delivery
class IdempotencyStore (backend: IdempotencyBackend,
ttl_seconds: int = 86400,
hash_fn: Callable[[dict[str, Any]], str] = <function canonical_json_sha256>,
*,
clock: Callable[[], float] = <built-in function time>)-
Expand source code
class IdempotencyStore: """Coordinator that binds canonical hashing to a storage backend. :param backend: A concrete :class:`IdempotencyBackend`. :param ttl_seconds: How long cached responses remain replayable. Must be within the spec's ``[3600, 604800]`` range (1h to 7d). 86400 (24h) is the recommended floor and matches the compliance storyboard. :param hash_fn: Optional override for the canonical hash function. Defaults to :func:`canonical_json_sha256`. Exposed for tests and for anyone who wants to experiment with alternative equivalence rules — though note the spec mandates RFC 8785 JCS for interop. """ def __init__( self, backend: IdempotencyBackend, ttl_seconds: int = 86400, hash_fn: Callable[[dict[str, Any]], str] = canonical_json_sha256, *, clock: Callable[[], float] = time.time, ) -> None: if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS: raise ValueError( f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] " f"per AdCP spec (capabilities.idempotency.replay_ttl_seconds), " f"got {ttl_seconds}" ) self.backend = backend self.ttl_seconds = ttl_seconds self._hash_fn = hash_fn self._clock = clock def capability(self) -> dict[str, Any]: """Return the capabilities fragment declaring this store's replay window. Embed under ``capabilities.adcp.idempotency`` on the seller's ``get_adcp_capabilities`` response. Buyers read this to reason about retry-safe windows (AdCP #2315):: caps.adcp.idempotency = idempotency.capability() # → {"supported": True, "replay_ttl_seconds": 86400} ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only ``replay_ttl_seconds`` fail strict schema validation on the new capabilities response. """ return {"supported": True, "replay_ttl_seconds": self.ttl_seconds} def wrap(self, handler: HandlerFn) -> HandlerFn: """Decorator that adds idempotency semantics to an AdCP handler method. The wrapped handler is called as ``handler(self, params, context)``. ``params`` may be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (via ``model_dump`` if Pydantic). The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes ``model_validate``). Callers that return raw dicts get dicts back. """ @wraps(handler) async def _wrapped( handler_self: Any, params: Any, context: Any = None, *args: Any, **kwargs: Any, ) -> Any: principal_id, idempotency_key, params_dict = self._prepare(params, context) if principal_id is None or idempotency_key is None: # No key → spec says the server MUST reject with INVALID_REQUEST. # We let the handler run so validation layers above us (Pydantic, # FastAPI, etc.) can reject with a typed error; the middleware's # job is only to dedup when a key IS present. return await handler(handler_self, params, context, *args, **kwargs) payload_hash = self._hash_fn(params_dict) cached = await self.backend.get(principal_id, idempotency_key) if cached is not None: if cached.payload_hash == payload_hash: logger.debug( "idempotency replay: principal=%s key_prefix=%s", principal_id, idempotency_key[:8], ) return _clone_response(cached.response) # Same key, different payload — spec-defined conflict. raise IdempotencyConflictError( operation=getattr(handler, "__name__", "handler"), errors=[ { "code": "IDEMPOTENCY_CONFLICT", "message": ( "idempotency_key reused with a different payload " "(canonical hash mismatch)" ), } ], ) response = await handler(handler_self, params, context, *args, **kwargs) # Deep-copy when caching so post-return mutation of the caller's # copy can't poison future replays. `_clone_response` also deep- # copies on the hit path, giving independent objects per replay. response_dict = copy.deepcopy(_to_dict(response)) entry = CachedResponse( payload_hash=payload_hash, response=response_dict, expires_at_epoch=self._clock() + self.ttl_seconds, ) # Commit cache AFTER handler returns. Atomicity with the handler's # side effects depends on the backend: MemoryBackend is best-effort # (no transactional relationship to external resources); PgBackend # (follow-up) will commit in the same transaction when the handler # uses the same engine. On put failure we log loudly and return # the handler's response — swallowing the exception would be wrong # (operators need the signal that caching is broken), and raising # would look to the caller like the handler failed, triggering a # retry that re-executes side effects. Best compromise: warn # operators, return the result, and accept that the next retry # with this key will re-execute. try: await self.backend.put(principal_id, idempotency_key, entry) except Exception: logger.warning( "Idempotency cache put failed for principal=%s key_prefix=%s — " "handler completed but a subsequent retry with this key will " "re-execute rather than replay. This indicates an operational " "issue with the idempotency backend.", principal_id, idempotency_key[:8], exc_info=True, ) return response return _wrapped def _prepare(self, params: Any, context: Any) -> tuple[str | None, str | None, dict[str, Any]]: """Normalize inputs and extract the (principal, key, params_dict) tuple. Returns ``(None, None, params_dict)`` when idempotency doesn't apply (no caller identity or no key supplied). The caller falls through to the plain handler in that case — validation of missing-key lives in the request schema, not here. """ params_dict = _to_dict(params) idempotency_key = params_dict.get("idempotency_key") if not isinstance(idempotency_key, str) or not idempotency_key: return None, None, params_dict principal_id = _extract_principal_id(context) if principal_id is None: # No caller identity: we can't safely scope the key. Spec requires # per-principal scope; anything else is a cross-principal replay # attack surface. Fall through to the handler (which will process # the request normally — no dedup, but no security regression). self._warn_missing_principal_once() return None, None, params_dict return principal_id, idempotency_key, params_dict _missing_principal_warned: bool = False def _warn_missing_principal_once(self) -> None: """Emit a one-time warning when the middleware sees a key but no principal. Silent fall-through is the worst DX: the seller drops in ``@idempotency.wrap``, ships, and doesn't discover until incident review that no dedup ever happened. Fire once per store instance so operators see the signal without filling logs on every request. """ if self._missing_principal_warned: return self._missing_principal_warned = True warnings.warn( "IdempotencyStore received a request with idempotency_key but no " "caller_identity on ToolContext — dedup is SKIPPED. This usually " "means your transport isn't populating the authenticated principal. " "A2A: wire an a2a-sdk auth middleware that sets ServerCallContext.user; " "MCP: populate ToolContext.caller_identity from your FastMCP auth " "middleware (see adcp.server.idempotency README). " "This warning fires once per IdempotencyStore instance.", UserWarning, stacklevel=3, )Coordinator that binds canonical hashing to a storage backend.
:param backend: A concrete :class:
IdempotencyBackend. :param ttl_seconds: How long cached responses remain replayable. Must be within the spec's[3600, 604800]range (1h to 7d). 86400 (24h) is the recommended floor and matches the compliance storyboard. :param hash_fn: Optional override for the canonical hash function. Defaults to :func:canonical_json_sha256. Exposed for tests and for anyone who wants to experiment with alternative equivalence rules — though note the spec mandates RFC 8785 JCS for interop.Methods
def capability(self) ‑> dict[str, typing.Any]-
Expand source code
def capability(self) -> dict[str, Any]: """Return the capabilities fragment declaring this store's replay window. Embed under ``capabilities.adcp.idempotency`` on the seller's ``get_adcp_capabilities`` response. Buyers read this to reason about retry-safe windows (AdCP #2315):: caps.adcp.idempotency = idempotency.capability() # → {"supported": True, "replay_ttl_seconds": 86400} ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only ``replay_ttl_seconds`` fail strict schema validation on the new capabilities response. """ return {"supported": True, "replay_ttl_seconds": self.ttl_seconds}Return the capabilities fragment declaring this store's replay window.
Embed under
capabilities.adcp.idempotencyon the seller'sget_adcp_capabilitiesresponse. Buyers read this to reason about retry-safe windows (AdCP #2315)::caps.adcp.idempotency = idempotency.capability() # → {"supported": True, "replay_ttl_seconds": 86400}supportedbecame REQUIRED in AdCP 3.0 GA — agents emitting onlyreplay_ttl_secondsfail strict schema validation on the new capabilities response. def wrap(self, handler: HandlerFn) ‑> Callable[..., Awaitable[typing.Any]]-
Expand source code
def wrap(self, handler: HandlerFn) -> HandlerFn: """Decorator that adds idempotency semantics to an AdCP handler method. The wrapped handler is called as ``handler(self, params, context)``. ``params`` may be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (via ``model_dump`` if Pydantic). The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes ``model_validate``). Callers that return raw dicts get dicts back. """ @wraps(handler) async def _wrapped( handler_self: Any, params: Any, context: Any = None, *args: Any, **kwargs: Any, ) -> Any: principal_id, idempotency_key, params_dict = self._prepare(params, context) if principal_id is None or idempotency_key is None: # No key → spec says the server MUST reject with INVALID_REQUEST. # We let the handler run so validation layers above us (Pydantic, # FastAPI, etc.) can reject with a typed error; the middleware's # job is only to dedup when a key IS present. return await handler(handler_self, params, context, *args, **kwargs) payload_hash = self._hash_fn(params_dict) cached = await self.backend.get(principal_id, idempotency_key) if cached is not None: if cached.payload_hash == payload_hash: logger.debug( "idempotency replay: principal=%s key_prefix=%s", principal_id, idempotency_key[:8], ) return _clone_response(cached.response) # Same key, different payload — spec-defined conflict. raise IdempotencyConflictError( operation=getattr(handler, "__name__", "handler"), errors=[ { "code": "IDEMPOTENCY_CONFLICT", "message": ( "idempotency_key reused with a different payload " "(canonical hash mismatch)" ), } ], ) response = await handler(handler_self, params, context, *args, **kwargs) # Deep-copy when caching so post-return mutation of the caller's # copy can't poison future replays. `_clone_response` also deep- # copies on the hit path, giving independent objects per replay. response_dict = copy.deepcopy(_to_dict(response)) entry = CachedResponse( payload_hash=payload_hash, response=response_dict, expires_at_epoch=self._clock() + self.ttl_seconds, ) # Commit cache AFTER handler returns. Atomicity with the handler's # side effects depends on the backend: MemoryBackend is best-effort # (no transactional relationship to external resources); PgBackend # (follow-up) will commit in the same transaction when the handler # uses the same engine. On put failure we log loudly and return # the handler's response — swallowing the exception would be wrong # (operators need the signal that caching is broken), and raising # would look to the caller like the handler failed, triggering a # retry that re-executes side effects. Best compromise: warn # operators, return the result, and accept that the next retry # with this key will re-execute. try: await self.backend.put(principal_id, idempotency_key, entry) except Exception: logger.warning( "Idempotency cache put failed for principal=%s key_prefix=%s — " "handler completed but a subsequent retry with this key will " "re-execute rather than replay. This indicates an operational " "issue with the idempotency backend.", principal_id, idempotency_key[:8], exc_info=True, ) return response return _wrappedDecorator that adds idempotency semantics to an AdCP handler method.
The wrapped handler is called as
handler(self, params, context).paramsmay be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (viamodel_dumpif Pydantic).The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes
model_validate). Callers that return raw dicts get dicts back.
class MCPToolSet (handler: ADCPHandler)-
Expand source code
class MCPToolSet: """Collection of MCP tools from an ADCP handler. Provides tool definitions and handlers for registering with an MCP server. """ def __init__(self, handler: ADCPHandler): """Create tool set from handler. Args: handler: ADCP handler instance """ self.handler = handler self._filtered_definitions = get_tools_for_handler(handler) self._tools: dict[str, Callable[..., Any]] = {} # Create tool callers only for filtered tools for tool_def in self._filtered_definitions: name = tool_def["name"] self._tools[name] = create_tool_caller(handler, name) @property def tool_definitions(self) -> list[dict[str, Any]]: """Get MCP tool definitions filtered by handler type.""" return list(self._filtered_definitions) async def call_tool(self, name: str, params: dict[str, Any]) -> Any: """Call a tool by name. Args: name: Tool name params: Tool parameters Returns: Tool result Raises: KeyError: If tool not found """ if name not in self._tools: raise KeyError(f"Unknown tool: {name}") return await self._tools[name](params) def get_tool_names(self) -> list[str]: """Get list of available tool names.""" return list(self._tools.keys())Collection of MCP tools from an ADCP handler.
Provides tool definitions and handlers for registering with an MCP server.
Create tool set from handler.
Args
handler- ADCP handler instance
Instance variables
prop tool_definitions : list[dict[str, Any]]-
Expand source code
@property def tool_definitions(self) -> list[dict[str, Any]]: """Get MCP tool definitions filtered by handler type.""" return list(self._filtered_definitions)Get MCP tool definitions filtered by handler type.
Methods
async def call_tool(self, name: str, params: dict[str, Any]) ‑> Any-
Expand source code
async def call_tool(self, name: str, params: dict[str, Any]) -> Any: """Call a tool by name. Args: name: Tool name params: Tool parameters Returns: Tool result Raises: KeyError: If tool not found """ if name not in self._tools: raise KeyError(f"Unknown tool: {name}") return await self._tools[name](params)Call a tool by name.
Args
name- Tool name
params- Tool parameters
Returns
Tool result
Raises
KeyError- If tool not found
def get_tool_names(self) ‑> list[str]-
Expand source code
def get_tool_names(self) -> list[str]: """Get list of available tool names.""" return list(self._tools.keys())Get list of available tool names.
class MemoryBackend (*, clock: Callable[[], float] = <built-in function time>)-
Expand source code
class MemoryBackend(IdempotencyBackend): """In-process dict-backed store. Suitable for tests, single-process reference implementations, and local development. **Not suitable for multi-process deployments** — each worker has its own cache, so a retry that lands on a different worker is treated as a fresh request. Thread safety: the backend uses an :class:`asyncio.Lock` to serialize mutations of the shared dict. Reads go through the lock too; for a pure in-process backend this is cheap and prevents torn reads across concurrent ``get``/``put`` interleaving. :param clock: Callable returning the current epoch seconds. Override for tests that need to advance time deterministically without monkeypatching :mod:`time`. Defaults to :func:`time.time`. """ def __init__(self, *, clock: Callable[[], float] = time.time) -> None: self._store: dict[tuple[str, str], CachedResponse] = {} self._lock = asyncio.Lock() self._clock = clock async def get( self, principal_id: str, key: str ) -> CachedResponse | None: async with self._lock: entry = self._store.get((principal_id, key)) if entry is None: return None if entry.expires_at_epoch <= self._clock(): # Lazy expiry — drop the stale entry so the next request # treats the slot as fresh and races to repopulate. del self._store[(principal_id, key)] return None return entry async def put( self, principal_id: str, key: str, entry: CachedResponse, ) -> None: async with self._lock: self._store[(principal_id, key)] = entry async def delete_expired(self, now_epoch: float | None = None) -> int: cutoff = now_epoch if now_epoch is not None else self._clock() async with self._lock: stale = [k for k, v in self._store.items() if v.expires_at_epoch <= cutoff] for k in stale: del self._store[k] return len(stale) async def clear(self) -> None: """Remove all cached entries. Test-suite hook — handy for resetting state between fixtures when a single :class:`MemoryBackend` is shared across multiple tests. """ async with self._lock: self._store.clear() async def _size(self) -> int: """Test-only: return the current entry count.""" async with self._lock: return len(self._store)In-process dict-backed store.
Suitable for tests, single-process reference implementations, and local development. Not suitable for multi-process deployments — each worker has its own cache, so a retry that lands on a different worker is treated as a fresh request.
Thread safety: the backend uses an :class:
asyncio.Lockto serialize mutations of the shared dict. Reads go through the lock too; for a pure in-process backend this is cheap and prevents torn reads across concurrentget/putinterleaving.:param clock: Callable returning the current epoch seconds. Override for tests that need to advance time deterministically without monkeypatching :mod:
time. Defaults to :func:time.time.Ancestors
- IdempotencyBackend
- abc.ABC
Methods
async def clear(self) ‑> None-
Expand source code
async def clear(self) -> None: """Remove all cached entries. Test-suite hook — handy for resetting state between fixtures when a single :class:`MemoryBackend` is shared across multiple tests. """ async with self._lock: self._store.clear()Remove all cached entries.
Test-suite hook — handy for resetting state between fixtures when a single :class:
MemoryBackendis shared across multiple tests.
Inherited members
class NotImplementedResponse (**data: Any)-
Expand source code
class NotImplementedResponse(BaseModel): """Standard response for operations not supported by this handler.""" supported: bool = False reason: str = "This operation is not supported by this agent" error: Error | None = NoneStandard response for operations not supported by this handler.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var error : Error | Nonevar model_configvar reason : strvar supported : bool
class ProposalBuilder (name: str, proposal_id: str | None = None)-
Expand source code
class ProposalBuilder: """Builder for ADCP Proposals. Helps construct valid proposals for get_products responses. Proposals represent recommended media plans with budget allocations. Example: proposal = ( ProposalBuilder("Q1 Brand Campaign") .with_description("Balanced awareness campaign") .add_allocation("product-1", 60) .with_rationale("High-impact display") .add_allocation("product-2", 40) .with_rationale("Contextual targeting") .with_budget_guidance(min=10000, recommended=25000, max=50000) .build() ) """ def __init__(self, name: str, proposal_id: str | None = None): """Create a new proposal builder. Args: name: Human-readable name for the proposal proposal_id: Unique ID (auto-generated if not provided) """ self._name = name self._proposal_id = proposal_id or f"proposal-{uuid4().hex[:8]}" self._description: str | None = None self._brief_alignment: str | None = None self._expires_at: datetime | None = None self._allocations: list[dict[str, Any]] = [] self._budget_guidance: dict[str, Any] | None = None self._current_allocation: AllocationBuilder | None = None self._ext: dict[str, Any] | None = None def with_description(self, description: str) -> ProposalBuilder: """Add description explaining the proposal strategy. Args: description: What the proposal achieves """ self._finalize_allocation() self._description = description return self def with_brief_alignment(self, alignment: str) -> ProposalBuilder: """Explain how proposal aligns with campaign brief. Args: alignment: Alignment explanation """ self._finalize_allocation() self._brief_alignment = alignment return self def expires_in(self, days: int = 7) -> ProposalBuilder: """Set expiration relative to now. Args: days: Number of days until expiration """ self._finalize_allocation() self._expires_at = datetime.now(timezone.utc) + timedelta(days=days) return self def expires_at(self, expires: datetime) -> ProposalBuilder: """Set absolute expiration time. Args: expires: When the proposal expires """ self._finalize_allocation() self._expires_at = expires return self def add_allocation( self, product_id: str, allocation_percentage: float, ) -> ProposalBuilder: """Add a product allocation. After calling this, chain allocation methods (with_rationale, etc.) before adding another allocation or calling build(). Args: product_id: ID of the product allocation_percentage: Percentage of budget (0-100) Returns: Self for method chaining """ self._finalize_allocation() self._current_allocation = AllocationBuilder(product_id, allocation_percentage) return self def with_pricing_option(self, pricing_option_id: str) -> ProposalBuilder: """Set pricing option for current allocation.""" if self._current_allocation: self._current_allocation.with_pricing_option(pricing_option_id) return self def with_rationale(self, rationale: str) -> ProposalBuilder: """Add rationale for current allocation.""" if self._current_allocation: self._current_allocation.with_rationale(rationale) return self def with_sequence(self, sequence: int) -> ProposalBuilder: """Set sequence for current allocation.""" if self._current_allocation: self._current_allocation.with_sequence(sequence) return self def with_tags(self, tags: list[str]) -> ProposalBuilder: """Add tags for current allocation.""" if self._current_allocation: self._current_allocation.with_tags(tags) return self def with_budget_guidance( self, *, min: float | None = None, recommended: float | None = None, max: float | None = None, currency: str = "USD", ) -> ProposalBuilder: """Add budget guidance for the proposal. Args: min: Minimum recommended budget recommended: Optimal budget max: Maximum before diminishing returns currency: ISO 4217 currency code """ self._finalize_allocation() self._budget_guidance = { "currency": currency, } if min is not None: self._budget_guidance["min"] = min if recommended is not None: self._budget_guidance["recommended"] = recommended if max is not None: self._budget_guidance["max"] = max return self def with_extension(self, ext: dict[str, Any]) -> ProposalBuilder: """Add extension data. Args: ext: Extension object """ self._finalize_allocation() self._ext = ext return self def _finalize_allocation(self) -> None: """Finalize current allocation and add to list.""" if self._current_allocation: self._allocations.append(self._current_allocation.build()) self._current_allocation = None def build(self) -> dict[str, Any]: """Build the proposal dict. Returns: Proposal as a dict ready for use in get_products response Raises: ValueError: If allocations don't sum to 100 """ self._finalize_allocation() if not self._allocations: raise ValueError("Proposal must have at least one allocation") total = sum(a["allocation_percentage"] for a in self._allocations) if abs(total - 100.0) > 0.01: raise ValueError(f"Allocation percentages must sum to 100, got {total}") proposal: dict[str, Any] = { "proposal_id": self._proposal_id, "name": self._name, "allocations": self._allocations, } if self._description: proposal["description"] = self._description if self._brief_alignment: proposal["brief_alignment"] = self._brief_alignment if self._expires_at: proposal["expires_at"] = self._expires_at.isoformat() if self._budget_guidance: proposal["total_budget_guidance"] = self._budget_guidance if self._ext: proposal["ext"] = self._ext return proposal def validate(self) -> list[str]: """Validate the proposal without building. Returns: List of validation errors (empty if valid) """ errors: list[str] = [] if self._current_allocation: allocations = self._allocations + [self._current_allocation.build()] else: allocations = self._allocations if not allocations: errors.append("Proposal must have at least one allocation") else: total = sum(a["allocation_percentage"] for a in allocations) if abs(total - 100.0) > 0.01: errors.append(f"Allocation percentages must sum to 100, got {total}") return errorsBuilder for ADCP Proposals.
Helps construct valid proposals for get_products responses. Proposals represent recommended media plans with budget allocations.
Example
proposal = ( ProposalBuilder("Q1 Brand Campaign") .with_description("Balanced awareness campaign") .add_allocation("product-1", 60) .with_rationale("High-impact display") .add_allocation("product-2", 40) .with_rationale("Contextual targeting") .with_budget_guidance(min=10000, recommended=25000, max=50000) .build() )
Create a new proposal builder.
Args
name- Human-readable name for the proposal
proposal_id- Unique ID (auto-generated if not provided)
Methods
def add_allocation(self, product_id: str, allocation_percentage: float) ‑> ProposalBuilder-
Expand source code
def add_allocation( self, product_id: str, allocation_percentage: float, ) -> ProposalBuilder: """Add a product allocation. After calling this, chain allocation methods (with_rationale, etc.) before adding another allocation or calling build(). Args: product_id: ID of the product allocation_percentage: Percentage of budget (0-100) Returns: Self for method chaining """ self._finalize_allocation() self._current_allocation = AllocationBuilder(product_id, allocation_percentage) return selfAdd a product allocation.
After calling this, chain allocation methods (with_rationale, etc.) before adding another allocation or calling build().
Args
product_id- ID of the product
allocation_percentage- Percentage of budget (0-100)
Returns
Self for method chaining
def build(self) ‑> dict[str, typing.Any]-
Expand source code
def build(self) -> dict[str, Any]: """Build the proposal dict. Returns: Proposal as a dict ready for use in get_products response Raises: ValueError: If allocations don't sum to 100 """ self._finalize_allocation() if not self._allocations: raise ValueError("Proposal must have at least one allocation") total = sum(a["allocation_percentage"] for a in self._allocations) if abs(total - 100.0) > 0.01: raise ValueError(f"Allocation percentages must sum to 100, got {total}") proposal: dict[str, Any] = { "proposal_id": self._proposal_id, "name": self._name, "allocations": self._allocations, } if self._description: proposal["description"] = self._description if self._brief_alignment: proposal["brief_alignment"] = self._brief_alignment if self._expires_at: proposal["expires_at"] = self._expires_at.isoformat() if self._budget_guidance: proposal["total_budget_guidance"] = self._budget_guidance if self._ext: proposal["ext"] = self._ext return proposalBuild the proposal dict.
Returns
Proposal as a dict ready for use in get_products response
Raises
ValueError- If allocations don't sum to 100
def expires_at(self, expires: datetime) ‑> ProposalBuilder-
Expand source code
def expires_at(self, expires: datetime) -> ProposalBuilder: """Set absolute expiration time. Args: expires: When the proposal expires """ self._finalize_allocation() self._expires_at = expires return selfSet absolute expiration time.
Args
expires- When the proposal expires
def expires_in(self, days: int = 7) ‑> ProposalBuilder-
Expand source code
def expires_in(self, days: int = 7) -> ProposalBuilder: """Set expiration relative to now. Args: days: Number of days until expiration """ self._finalize_allocation() self._expires_at = datetime.now(timezone.utc) + timedelta(days=days) return selfSet expiration relative to now.
Args
days- Number of days until expiration
def validate(self) ‑> list[str]-
Expand source code
def validate(self) -> list[str]: """Validate the proposal without building. Returns: List of validation errors (empty if valid) """ errors: list[str] = [] if self._current_allocation: allocations = self._allocations + [self._current_allocation.build()] else: allocations = self._allocations if not allocations: errors.append("Proposal must have at least one allocation") else: total = sum(a["allocation_percentage"] for a in allocations) if abs(total - 100.0) > 0.01: errors.append(f"Allocation percentages must sum to 100, got {total}") return errorsValidate the proposal without building.
Returns
List of validation errors (empty if valid)
def with_brief_alignment(self, alignment: str) ‑> ProposalBuilder-
Expand source code
def with_brief_alignment(self, alignment: str) -> ProposalBuilder: """Explain how proposal aligns with campaign brief. Args: alignment: Alignment explanation """ self._finalize_allocation() self._brief_alignment = alignment return selfExplain how proposal aligns with campaign brief.
Args
alignment- Alignment explanation
def with_budget_guidance(self,
*,
min: float | None = None,
recommended: float | None = None,
max: float | None = None,
currency: str = 'USD') ‑> ProposalBuilder-
Expand source code
def with_budget_guidance( self, *, min: float | None = None, recommended: float | None = None, max: float | None = None, currency: str = "USD", ) -> ProposalBuilder: """Add budget guidance for the proposal. Args: min: Minimum recommended budget recommended: Optimal budget max: Maximum before diminishing returns currency: ISO 4217 currency code """ self._finalize_allocation() self._budget_guidance = { "currency": currency, } if min is not None: self._budget_guidance["min"] = min if recommended is not None: self._budget_guidance["recommended"] = recommended if max is not None: self._budget_guidance["max"] = max return selfAdd budget guidance for the proposal.
Args
min- Minimum recommended budget
recommended- Optimal budget
max- Maximum before diminishing returns
currency- ISO 4217 currency code
def with_description(self, description: str) ‑> ProposalBuilder-
Expand source code
def with_description(self, description: str) -> ProposalBuilder: """Add description explaining the proposal strategy. Args: description: What the proposal achieves """ self._finalize_allocation() self._description = description return selfAdd description explaining the proposal strategy.
Args
description- What the proposal achieves
def with_extension(self, ext: dict[str, Any]) ‑> ProposalBuilder-
Expand source code
def with_extension(self, ext: dict[str, Any]) -> ProposalBuilder: """Add extension data. Args: ext: Extension object """ self._finalize_allocation() self._ext = ext return selfAdd extension data.
Args
ext- Extension object
def with_pricing_option(self, pricing_option_id: str) ‑> ProposalBuilder-
Expand source code
def with_pricing_option(self, pricing_option_id: str) -> ProposalBuilder: """Set pricing option for current allocation.""" if self._current_allocation: self._current_allocation.with_pricing_option(pricing_option_id) return selfSet pricing option for current allocation.
def with_rationale(self, rationale: str) ‑> ProposalBuilder-
Expand source code
def with_rationale(self, rationale: str) -> ProposalBuilder: """Add rationale for current allocation.""" if self._current_allocation: self._current_allocation.with_rationale(rationale) return selfAdd rationale for current allocation.
def with_sequence(self, sequence: int) ‑> ProposalBuilder-
Expand source code
def with_sequence(self, sequence: int) -> ProposalBuilder: """Set sequence for current allocation.""" if self._current_allocation: self._current_allocation.with_sequence(sequence) return selfSet sequence for current allocation.
-
Expand source code
def with_tags(self, tags: list[str]) -> ProposalBuilder: """Add tags for current allocation.""" if self._current_allocation: self._current_allocation.with_tags(tags) return selfAdd tags for current allocation.
class ProposalNotSupported (**data: Any)-
Expand source code
class ProposalNotSupported(BaseModel): """Response indicating proposal generation is not supported. Use this when your agent supports get_products but not proposal generation. """ proposals_supported: bool = False reason: str = "This agent does not generate proposals" error: Error | None = NoneResponse indicating proposal generation is not supported.
Use this when your agent supports get_products but not proposal generation.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var error : Error | Nonevar model_configvar proposals_supported : boolvar reason : str
class SponsoredIntelligenceHandler-
Expand source code
class SponsoredIntelligenceHandler(ADCPHandler): """Handler for Sponsored Intelligence protocol. Subclass this to implement a Sponsored Intelligence agent. All SI operations must be implemented via the handle_* methods. The public methods (si_get_offering, etc.) handle validation and error handling automatically. Non-SI operations (get_products, create_media_buy, content standards, etc.) return 'not supported' via the base class. Example: class MySIHandler(SponsoredIntelligenceHandler): async def handle_si_get_offering( self, request: SiGetOfferingRequest, context: ToolContext | None = None ) -> SiGetOfferingResponse: # Your implementation return SiGetOfferingResponse(...) """ _agent_type: str = "Sponsored Intelligence agents" # ======================================================================== # Sponsored Intelligence Operations - Override base class with validation # ======================================================================== async def si_get_offering( self, params: SiGetOfferingRequest | dict[str, Any], context: ToolContext | None = None, ) -> SiGetOfferingResponse | NotImplementedResponse: """Get sponsored intelligence offering. Validates params and delegates to handle_si_get_offering. """ try: request = SiGetOfferingRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_si_get_offering(request, context) async def si_initiate_session( self, params: SiInitiateSessionRequest | dict[str, Any], context: ToolContext | None = None, ) -> SiInitiateSessionResponse | NotImplementedResponse: """Initiate sponsored intelligence session. Validates params and delegates to handle_si_initiate_session. """ try: request = SiInitiateSessionRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_si_initiate_session(request, context) async def si_send_message( self, params: SiSendMessageRequest | dict[str, Any], context: ToolContext | None = None, ) -> SiSendMessageResponse | NotImplementedResponse: """Send message in sponsored intelligence session. Validates params and delegates to handle_si_send_message. """ try: request = SiSendMessageRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_si_send_message(request, context) async def si_terminate_session( self, params: SiTerminateSessionRequest | dict[str, Any], context: ToolContext | None = None, ) -> SiTerminateSessionResponse | NotImplementedResponse: """Terminate sponsored intelligence session. Validates params and delegates to handle_si_terminate_session. """ try: request = SiTerminateSessionRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_si_terminate_session(request, context) # ======================================================================== # Abstract handlers - Implement these in subclasses # ======================================================================== @abstractmethod async def handle_si_get_offering( self, request: SiGetOfferingRequest, context: ToolContext | None = None, ) -> SiGetOfferingResponse: """Handle get offering request.""" ... @abstractmethod async def handle_si_initiate_session( self, request: SiInitiateSessionRequest, context: ToolContext | None = None, ) -> SiInitiateSessionResponse: """Handle initiate session request.""" ... @abstractmethod async def handle_si_send_message( self, request: SiSendMessageRequest, context: ToolContext | None = None, ) -> SiSendMessageResponse: """Handle send message request.""" ... @abstractmethod async def handle_si_terminate_session( self, request: SiTerminateSessionRequest, context: ToolContext | None = None, ) -> SiTerminateSessionResponse: """Handle terminate session request.""" ...Handler for Sponsored Intelligence protocol.
Subclass this to implement a Sponsored Intelligence agent. All SI operations must be implemented via the handle_* methods. The public methods (si_get_offering, etc.) handle validation and error handling automatically.
Non-SI operations (get_products, create_media_buy, content standards, etc.) return 'not supported' via the base class.
Example
class MySIHandler(SponsoredIntelligenceHandler): async def handle_si_get_offering( self, request: SiGetOfferingRequest, context: ToolContext | None = None ) -> SiGetOfferingResponse: # Your implementation return SiGetOfferingResponse(…)
Ancestors
- ADCPHandler
- abc.ABC
Methods
async def handle_si_get_offering(self,
request: SiGetOfferingRequest,
context: ToolContext | None = None) ‑> SiGetOfferingResponse-
Expand source code
@abstractmethod async def handle_si_get_offering( self, request: SiGetOfferingRequest, context: ToolContext | None = None, ) -> SiGetOfferingResponse: """Handle get offering request.""" ...Handle get offering request.
async def handle_si_initiate_session(self,
request: SiInitiateSessionRequest,
context: ToolContext | None = None) ‑> SiInitiateSessionResponse-
Expand source code
@abstractmethod async def handle_si_initiate_session( self, request: SiInitiateSessionRequest, context: ToolContext | None = None, ) -> SiInitiateSessionResponse: """Handle initiate session request.""" ...Handle initiate session request.
async def handle_si_send_message(self,
request: SiSendMessageRequest,
context: ToolContext | None = None) ‑> SiSendMessageResponse-
Expand source code
@abstractmethod async def handle_si_send_message( self, request: SiSendMessageRequest, context: ToolContext | None = None, ) -> SiSendMessageResponse: """Handle send message request.""" ...Handle send message request.
async def handle_si_terminate_session(self,
request: SiTerminateSessionRequest,
context: ToolContext | None = None) ‑> SiTerminateSessionResponse-
Expand source code
@abstractmethod async def handle_si_terminate_session( self, request: SiTerminateSessionRequest, context: ToolContext | None = None, ) -> SiTerminateSessionResponse: """Handle terminate session request.""" ...Handle terminate session request.
async def si_get_offering(self,
params: SiGetOfferingRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SiGetOfferingResponse | NotImplementedResponse-
Expand source code
async def si_get_offering( self, params: SiGetOfferingRequest | dict[str, Any], context: ToolContext | None = None, ) -> SiGetOfferingResponse | NotImplementedResponse: """Get sponsored intelligence offering. Validates params and delegates to handle_si_get_offering. """ try: request = SiGetOfferingRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_si_get_offering(request, context)Get sponsored intelligence offering.
Validates params and delegates to handle_si_get_offering.
async def si_initiate_session(self,
params: SiInitiateSessionRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SiInitiateSessionResponse | NotImplementedResponse-
Expand source code
async def si_initiate_session( self, params: SiInitiateSessionRequest | dict[str, Any], context: ToolContext | None = None, ) -> SiInitiateSessionResponse | NotImplementedResponse: """Initiate sponsored intelligence session. Validates params and delegates to handle_si_initiate_session. """ try: request = SiInitiateSessionRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_si_initiate_session(request, context)Initiate sponsored intelligence session.
Validates params and delegates to handle_si_initiate_session.
async def si_send_message(self,
params: SiSendMessageRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SiSendMessageResponse | NotImplementedResponse-
Expand source code
async def si_send_message( self, params: SiSendMessageRequest | dict[str, Any], context: ToolContext | None = None, ) -> SiSendMessageResponse | NotImplementedResponse: """Send message in sponsored intelligence session. Validates params and delegates to handle_si_send_message. """ try: request = SiSendMessageRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_si_send_message(request, context)Send message in sponsored intelligence session.
Validates params and delegates to handle_si_send_message.
async def si_terminate_session(self,
params: SiTerminateSessionRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SiTerminateSessionResponse | NotImplementedResponse-
Expand source code
async def si_terminate_session( self, params: SiTerminateSessionRequest | dict[str, Any], context: ToolContext | None = None, ) -> SiTerminateSessionResponse | NotImplementedResponse: """Terminate sponsored intelligence session. Validates params and delegates to handle_si_terminate_session. """ try: request = SiTerminateSessionRequest.model_validate(params) except ValidationError as e: return NotImplementedResponse( supported=False, reason=f"Invalid request: {e}", error=Error(code="VALIDATION_ERROR", message=str(e)), ) return await self.handle_si_terminate_session(request, context)Terminate sponsored intelligence session.
Validates params and delegates to handle_si_terminate_session.
Inherited members
ADCPHandler:acquire_rightsactivate_signalbuild_creativecalibrate_contentcheck_governancecomply_test_controllercontext_matchcreate_collection_listcreate_content_standardscreate_media_buycreate_property_listdelete_collection_listdelete_property_listget_account_financialsget_adcp_capabilitiesget_brand_identityget_collection_listget_content_standardsget_creative_deliveryget_creative_featuresget_media_buy_artifactsget_media_buy_deliveryget_media_buysget_plan_audit_logsget_productsget_property_listget_rightsget_signalsidentity_matchlist_accountslist_collection_listslist_content_standardslist_creative_formatslist_creativeslist_property_listslog_eventpreview_creativeprovide_performance_feedbackreport_plan_outcomereport_usagesync_accountssync_audiencessync_catalogssync_creativessync_event_sourcessync_governancesync_plansupdate_collection_listupdate_content_standardsupdate_media_buyupdate_property_listupdate_rightsvalidate_content_delivery
class TestControllerError (code: str, message: str, current_state: str | None = None)-
Expand source code
class TestControllerError(Exception): """Typed error for test controller store methods. Raise this from your TestControllerStore methods to return structured error responses. The dispatcher catches it and converts to the AdCP comply_test_controller error format. Example: async def force_media_buy_status(self, media_buy_id, status, rejection_reason=None): prev = self.media_buys.get(media_buy_id) if prev is None: raise TestControllerError("NOT_FOUND", f"Media buy {media_buy_id} not found") if prev in ("completed", "rejected", "canceled"): raise TestControllerError( "INVALID_TRANSITION", f"Cannot transition from {prev}", current_state=prev, ) self.media_buys[media_buy_id] = status return {"previous_state": prev, "current_state": status} """ def __init__(self, code: str, message: str, current_state: str | None = None): super().__init__(message) self.code = code self.current_state = current_stateTyped error for test controller store methods.
Raise this from your TestControllerStore methods to return structured error responses. The dispatcher catches it and converts to the AdCP comply_test_controller error format.
Example
async def force_media_buy_status(self, media_buy_id, status, rejection_reason=None): prev = self.media_buys.get(media_buy_id) if prev is None: raise TestControllerError("NOT_FOUND", f"Media buy {media_buy_id} not found") if prev in ("completed", "rejected", "canceled"): raise TestControllerError( "INVALID_TRANSITION", f"Cannot transition from {prev}", current_state=prev, ) self.media_buys[media_buy_id] = status return {"previous_state": prev, "current_state": status}
Ancestors
- builtins.Exception
- builtins.BaseException
class TestControllerStore-
Expand source code
class TestControllerStore: """Base class for test controller state management. Subclass this and override the methods for scenarios your agent supports. Methods you don't override will be reported as unsupported scenarios and excluded from list_scenarios. Raise TestControllerError for structured error responses. """ async def force_creative_status( self, creative_id: str, status: str, rejection_reason: str | None = None ) -> dict[str, Any]: """Force a creative to a given status. Returns: {"previous_state": str, "current_state": str} """ raise NotImplementedError async def force_account_status(self, account_id: str, status: str) -> dict[str, Any]: """Force an account to a given status. Returns: {"previous_state": str, "current_state": str} """ raise NotImplementedError async def force_media_buy_status( self, media_buy_id: str, status: str, rejection_reason: str | None = None ) -> dict[str, Any]: """Force a media buy to a given status. Returns: {"previous_state": str, "current_state": str} """ raise NotImplementedError async def force_session_status( self, session_id: str, status: str, termination_reason: str | None = None ) -> dict[str, Any]: """Force a session to a given status. Returns: {"previous_state": str, "current_state": str} """ raise NotImplementedError async def simulate_delivery( self, media_buy_id: str, impressions: int | None = None, clicks: int | None = None, conversions: int | None = None, reported_spend: dict[str, Any] | None = None, ) -> dict[str, Any]: """Simulate delivery metrics for a media buy. Returns: {"simulated": {...}, "cumulative": {...} | None} """ raise NotImplementedError async def simulate_budget_spend( self, spend_percentage: float, account_id: str | None = None, media_buy_id: str | None = None, ) -> dict[str, Any]: """Simulate budget spend to a percentage. Returns: {"simulated": {...}} """ raise NotImplementedErrorBase class for test controller state management.
Subclass this and override the methods for scenarios your agent supports. Methods you don't override will be reported as unsupported scenarios and excluded from list_scenarios.
Raise TestControllerError for structured error responses.
Methods
async def force_account_status(self, account_id: str, status: str) ‑> dict[str, typing.Any]-
Expand source code
async def force_account_status(self, account_id: str, status: str) -> dict[str, Any]: """Force an account to a given status. Returns: {"previous_state": str, "current_state": str} """ raise NotImplementedErrorForce an account to a given status.
Returns
{"previous_state": str, "current_state": str}
async def force_creative_status(self, creative_id: str, status: str, rejection_reason: str | None = None) ‑> dict[str, typing.Any]-
Expand source code
async def force_creative_status( self, creative_id: str, status: str, rejection_reason: str | None = None ) -> dict[str, Any]: """Force a creative to a given status. Returns: {"previous_state": str, "current_state": str} """ raise NotImplementedErrorForce a creative to a given status.
Returns
{"previous_state": str, "current_state": str}
async def force_media_buy_status(self, media_buy_id: str, status: str, rejection_reason: str | None = None) ‑> dict[str, typing.Any]-
Expand source code
async def force_media_buy_status( self, media_buy_id: str, status: str, rejection_reason: str | None = None ) -> dict[str, Any]: """Force a media buy to a given status. Returns: {"previous_state": str, "current_state": str} """ raise NotImplementedErrorForce a media buy to a given status.
Returns
{"previous_state": str, "current_state": str}
async def force_session_status(self, session_id: str, status: str, termination_reason: str | None = None) ‑> dict[str, typing.Any]-
Expand source code
async def force_session_status( self, session_id: str, status: str, termination_reason: str | None = None ) -> dict[str, Any]: """Force a session to a given status. Returns: {"previous_state": str, "current_state": str} """ raise NotImplementedErrorForce a session to a given status.
Returns
{"previous_state": str, "current_state": str}
async def simulate_budget_spend(self,
spend_percentage: float,
account_id: str | None = None,
media_buy_id: str | None = None) ‑> dict[str, typing.Any]-
Expand source code
async def simulate_budget_spend( self, spend_percentage: float, account_id: str | None = None, media_buy_id: str | None = None, ) -> dict[str, Any]: """Simulate budget spend to a percentage. Returns: {"simulated": {...}} """ raise NotImplementedErrorSimulate budget spend to a percentage.
Returns
{"simulated": {…}}
async def simulate_delivery(self,
media_buy_id: str,
impressions: int | None = None,
clicks: int | None = None,
conversions: int | None = None,
reported_spend: dict[str, Any] | None = None) ‑> dict[str, typing.Any]-
Expand source code
async def simulate_delivery( self, media_buy_id: str, impressions: int | None = None, clicks: int | None = None, conversions: int | None = None, reported_spend: dict[str, Any] | None = None, ) -> dict[str, Any]: """Simulate delivery metrics for a media buy. Returns: {"simulated": {...}, "cumulative": {...} | None} """ raise NotImplementedErrorSimulate delivery metrics for a media buy.
Returns
{"simulated": {…}, "cumulative": {…} | None}
class TmpHandler-
Expand source code
class TmpHandler(ADCPHandler): """Handler for Temporal Matching Protocol operations. Subclass this to implement context matching and identity matching. Only TMP tools will be exposed via MCP. Example: class MyTmpAgent(TmpHandler): async def context_match(self, params, context=None): # Evaluate context signals against buyer packages pass async def identity_match(self, params, context=None): # Evaluate user identity for package eligibility pass """ _agent_type = "TMP agents"Handler for Temporal Matching Protocol operations.
Subclass this to implement context matching and identity matching. Only TMP tools will be exposed via MCP.
Example
class MyTmpAgent(TmpHandler): async def context_match(self, params, context=None): # Evaluate context signals against buyer packages pass
async def identity_match(self, params, context=None): # Evaluate user identity for package eligibility passAncestors
- ADCPHandler
- abc.ABC
Inherited members
ADCPHandler:acquire_rightsactivate_signalbuild_creativecalibrate_contentcheck_governancecomply_test_controllercontext_matchcreate_collection_listcreate_content_standardscreate_media_buycreate_property_listdelete_collection_listdelete_property_listget_account_financialsget_adcp_capabilitiesget_brand_identityget_collection_listget_content_standardsget_creative_deliveryget_creative_featuresget_media_buy_artifactsget_media_buy_deliveryget_media_buysget_plan_audit_logsget_productsget_property_listget_rightsget_signalsidentity_matchlist_accountslist_collection_listslist_content_standardslist_creative_formatslist_creativeslist_property_listslog_eventpreview_creativeprovide_performance_feedbackreport_plan_outcomereport_usagesi_get_offeringsi_initiate_sessionsi_send_messagesi_terminate_sessionsync_accountssync_audiencessync_catalogssync_creativessync_event_sourcessync_governancesync_plansupdate_collection_listupdate_content_standardsupdate_media_buyupdate_property_listupdate_rightsvalidate_content_delivery
class ToolContext (request_id: str | None = None,
caller_identity: str | None = None,
metadata: dict[str, Any] = <factory>)-
Expand source code
@dataclass class ToolContext: """Context passed to tool handlers. Contains metadata about the current request that may be useful for logging, authorization, or other cross-cutting concerns. :param caller_identity: The authenticated principal making the request. **MUST** be a stable, globally-unique identifier within the seller's tenant — never an email, display name, or any other mutable handle. The server-side idempotency middleware keys its cache by ``(caller_identity, idempotency_key)`` — reuse of the same string for two distinct principals (e.g. email reuse after account deletion) causes cross-principal replay (confidentiality leak). Populated by the transport layer (A2A: ``ServerCallContext.user.user_name``; MCP: seller's FastMCP auth middleware). """ request_id: str | None = None caller_identity: str | None = None metadata: dict[str, Any] = field(default_factory=dict)Context passed to tool handlers.
Contains metadata about the current request that may be useful for logging, authorization, or other cross-cutting concerns.
:param caller_identity: The authenticated principal making the request. MUST be a stable, globally-unique identifier within the seller's tenant — never an email, display name, or any other mutable handle. The server-side idempotency middleware keys its cache by
(caller_identity, idempotency_key)— reuse of the same string for two distinct principals (e.g. email reuse after account deletion) causes cross-principal replay (confidentiality leak). Populated by the transport layer (A2A:ServerCallContext.user.user_name; MCP: seller's FastMCP auth middleware).Instance variables
var caller_identity : str | Nonevar metadata : dict[str, typing.Any]var request_id : str | None