Module adcp.server

ADCP Server Framework.

Build an AdCP agent in minutes. The framework handles the protocol plumbing so you focus on business logic.

Quickstart (class-based)::

from adcp.server import ADCPHandler, serve
from adcp.server.responses import capabilities_response, products_response

class MySeller(ADCPHandler):
    async def get_adcp_capabilities(self, params, context=None):
        return capabilities_response(["media_buy"])

    async def get_products(self, params, context=None):
        return products_response(MY_PRODUCTS)

serve(MySeller(), name="my-seller")

Quickstart (decorator-based)::

from adcp.server import adcp_server, serve
from adcp.server.responses import products_response

server = adcp_server("my-seller")

@server.get_products
async def get_products(params, context=None):
    return products_response(MY_PRODUCTS)

serve(server, name="my-seller")  # capabilities auto-generated

What the framework does automatically:

  • Error responses: adcp_error("BUDGET_TOO_LOW") auto-populates recovery classification (transient/correctable/terminal) from 20+ standard codes.
  • State transitions: media_buy_response(..., status="active") auto-populates valid_actions from the status. No manual mapping.
  • Account resolution: resolve_account()(params, my_resolver) auto-resolves AccountReference and returns ACCOUNT_NOT_FOUND errors.
  • Context passthrough: inject_context()(params, response) echoes the request context field back in the response (ADCP requirement).
  • Cancellation: cancel_media_buy_response(id, "buyer") auto-sets canceled_at, status, and valid_actions=[].
  • Capabilities: The decorator builder auto-generates get_adcp_capabilities from which handlers you register.
  • Validation: GovernanceHandler and ContentStandardsHandler auto-validate request dicts into Pydantic models before your handler code runs.

Sub-modules

adcp.server.a2a_server

A2A server support for ADCP handlers …

adcp.server.base

Base classes for ADCP server implementations …

adcp.server.brand

Brand rights handler for ADCP server implementations.

adcp.server.builder

Decorator-based server builder for ADCP …

adcp.server.compliance

Compliance test controller handler for ADCP server implementations.

adcp.server.content_standards

Content Standards protocol handler …

adcp.server.governance

Governance protocol handler …

adcp.server.helpers

DX helpers for ADCP server builders …

adcp.server.idempotency

Server-side idempotency middleware for AdCP mutating tool handlers …

adcp.server.mcp_tools

MCP server integration helpers …

adcp.server.proposal

Proposal generation helpers …

adcp.server.responses

Response builder helpers for ADCP servers …

adcp.server.sponsored_intelligence

Sponsored Intelligence protocol handler …

adcp.server.test_controller

Built-in comply_test_controller for ADCP servers …

adcp.server.tmp

TMP (Temporal Matching Protocol) handler for ADCP server implementations.

adcp.server.translate

Error translation and request normalization for proxy and custom-transport servers …

Functions

def activate_signal_response(deployments: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def activate_signal_response(
    deployments: list[dict[str, Any]],
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build an activate_signal success response.

    Each deployment should include: type, is_live, activation_key.
    For platform: platform, account.
    For agent: agent_url.
    Matches ActivateSignalResponse1 (success) schema.
    """
    return {
        "deployments": deployments,
        "sandbox": sandbox,
    }

Build an activate_signal success response.

Each deployment should include: type, is_live, activation_key. For platform: platform, account. For agent: agent_url. Matches ActivateSignalResponse1 (success) schema.

def adcp_error(code: str,
message: str | None = None,
*,
field: str | None = None,
suggestion: str | None = None,
recovery: str | None = None,
retry_after: int | None = None,
details: dict[str, str | int | float | bool | None] | None = None) ‑> dict[str, typing.Any]
Expand source code
def adcp_error(
    code: str,
    message: str | None = None,
    *,
    field: str | None = None,
    suggestion: str | None = None,
    recovery: str | None = None,
    retry_after: int | None = None,
    details: dict[str, str | int | float | bool | None] | None = None,
) -> dict[str, Any]:
    """Build a structured ADCP error response with auto-recovery.

    Standard codes get recovery auto-populated from the code table.
    Custom codes default to "terminal".

    Args:
        code: Error code (e.g., "BUDGET_TOO_LOW").
        message: Human-readable message. Defaults to standard message.
        field: Which request field caused the error.
        suggestion: Actionable fix suggestion.
        recovery: Override ("transient", "correctable", "terminal").
        retry_after: Seconds to wait (for RATE_LIMITED).
        details: Server-generated debugging data (constraint names, limits,
            thresholds). Use only server-generated values here. NEVER pass
            request params or user-supplied strings -- they flow to the
            caller's LLM context and could enable prompt injection.
    """
    std = STANDARD_ERROR_CODES.get(code, {})
    err: dict[str, Any] = {
        "code": code,
        "message": message or std.get("message", code),
        "recovery": recovery or std.get("recovery", "terminal"),
    }
    if field is not None:
        err["field"] = field
    if suggestion is not None:
        err["suggestion"] = suggestion
    if retry_after is not None:
        err["retry_after"] = retry_after
    if details is not None:
        err["details"] = details
    return {"errors": [err]}

Build a structured ADCP error response with auto-recovery.

Standard codes get recovery auto-populated from the code table. Custom codes default to "terminal".

Args

code
Error code (e.g., "BUDGET_TOO_LOW").
message
Human-readable message. Defaults to standard message.
field
Which request field caused the error.
suggestion
Actionable fix suggestion.
recovery
Override ("transient", "correctable", "terminal").
retry_after
Seconds to wait (for RATE_LIMITED).
details
Server-generated debugging data (constraint names, limits, thresholds). Use only server-generated values here. NEVER pass request params or user-supplied strings – they flow to the caller's LLM context and could enable prompt injection.
def adcp_server(name: str, **kwargs: Any) ‑> ADCPServerBuilder
Expand source code
def adcp_server(name: str, **kwargs: Any) -> ADCPServerBuilder:
    """Create a decorator-based ADCP server builder.

    Args:
        name: Server name.
        **kwargs: Additional configuration (e.g., version="1.0.0").

    Returns:
        An ADCPServerBuilder instance.
    """
    return ADCPServerBuilder(name, **kwargs)

Create a decorator-based ADCP server builder.

Args

name
Server name.
**kwargs
Additional configuration (e.g., version="1.0.0").

Returns

An ADCPServerBuilder instance.

def build_creative_response(creative_manifest: dict[str, Any] | list[dict[str, Any]],
*,
sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def build_creative_response(
    creative_manifest: dict[str, Any] | list[dict[str, Any]],
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a build_creative success response.

    Accepts either a single manifest dict or a list of manifests.
    Each manifest should include: format_id, name, assets.

    Single manifest matches BuildCreativeResponse1.
    List matches BuildCreativeResponse2 (multi-format).
    """
    if isinstance(creative_manifest, list):
        return {
            "creative_manifests": creative_manifest,
            "sandbox": sandbox,
        }
    return {
        "creative_manifest": creative_manifest,
        "sandbox": sandbox,
    }

Build a build_creative success response.

Accepts either a single manifest dict or a list of manifests. Each manifest should include: format_id, name, assets.

Single manifest matches BuildCreativeResponse1. List matches BuildCreativeResponse2 (multi-format).

def cancel_media_buy_response(media_buy_id: str,
canceled_by: str,
*,
reason: str | None = None,
canceled_at: str | None = None,
affected_packages: list[Any] | None = None,
revision: int | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def cancel_media_buy_response(
    media_buy_id: str,
    canceled_by: str,
    *,
    reason: str | None = None,
    canceled_at: str | None = None,
    affected_packages: list[Any] | None = None,
    revision: int | None = None,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a cancellation response with auto-defaults.

    Auto-sets canceled_at to now, status to "canceled", valid_actions to [].
    Requires canceled_by ("buyer" or "seller") - the field developers
    most commonly forget.
    """
    if canceled_by not in ("buyer", "seller"):
        raise ValueError(f"canceled_by must be 'buyer' or 'seller', got {canceled_by!r}")
    resp: dict[str, Any] = {
        "media_buy_id": media_buy_id,
        "status": "canceled",
        "canceled_by": canceled_by,
        "canceled_at": canceled_at or datetime.now(timezone.utc).isoformat(),
        "valid_actions": [],
        "sandbox": sandbox,
    }
    if reason is not None:
        resp["reason"] = reason
    if affected_packages is not None:
        resp["affected_packages"] = affected_packages
    if revision is not None:
        resp["revision"] = revision
    return resp

Build a cancellation response with auto-defaults.

Auto-sets canceled_at to now, status to "canceled", valid_actions to []. Requires canceled_by ("buyer" or "seller") - the field developers most commonly forget.

def capabilities_response(supported_protocols: list[str],
*,
major_versions: list[int] | None = None,
sandbox: bool = True,
features: dict[str, Any] | None = None,
idempotency: dict[str, Any] | None = None,
compliance_testing: dict[str, Any] | None = None) ‑> dict[str, typing.Any]
Expand source code
def capabilities_response(
    supported_protocols: list[str],
    *,
    major_versions: list[int] | None = None,
    sandbox: bool = True,
    features: dict[str, Any] | None = None,
    idempotency: dict[str, Any] | None = None,
    compliance_testing: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Build a get_adcp_capabilities response.

    Args:
        supported_protocols: e.g. ["media_buy"], ["media_buy", "signals"].
            Valid values: media_buy, signals, governance, creative, brand,
            sponsored_intelligence. ``compliance_testing`` is NOT a protocol —
            pass it via the ``compliance_testing`` kwarg.
        major_versions: AdCP major versions. Defaults to [3].
        sandbox: Whether this is a sandbox agent. Defaults to True.
        features: Additional feature flags.
        idempotency: Optional idempotency declaration, nested under
            ``adcp.idempotency`` per AdCP #2315. Pass the output of
            :meth:`adcp.server.idempotency.IdempotencyStore.capability` here
            to declare the seller's ``replay_ttl_seconds``.
        compliance_testing: Optional top-level ``compliance_testing`` block
            to advertise compliance-testing capabilities. When provided,
            emitted as a sibling of ``adcp`` in the response.

    Example::

        from adcp.server.responses import capabilities_response
        from adcp.server.idempotency import IdempotencyStore, MemoryBackend

        store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400)
        return capabilities_response(
            ["media_buy"],
            idempotency=store.capability(),
        )
    """
    adcp_info: dict[str, Any] = {"major_versions": major_versions or [3]}
    if idempotency:
        adcp_info["idempotency"] = idempotency
    resp: dict[str, Any] = {
        "adcp": adcp_info,
        "supported_protocols": supported_protocols,
        "sandbox": sandbox,
    }
    if features:
        resp["features"] = features
    if compliance_testing is not None:
        resp["compliance_testing"] = compliance_testing
    return resp

Build a get_adcp_capabilities response.

Args

supported_protocols
e.g. ["media_buy"], ["media_buy", "signals"]. Valid values: media_buy, signals, governance, creative, brand, sponsored_intelligence. compliance_testing is NOT a protocol — pass it via the compliance_testing kwarg.
major_versions
AdCP major versions. Defaults to [3].
sandbox
Whether this is a sandbox agent. Defaults to True.
features
Additional feature flags.
idempotency
Optional idempotency declaration, nested under adcp.idempotency per AdCP #2315. Pass the output of :meth:IdempotencyStore.capability() here to declare the seller's replay_ttl_seconds.
compliance_testing
Optional top-level compliance_testing block to advertise compliance-testing capabilities. When provided, emitted as a sibling of adcp in the response.

Example::

from adcp.server.responses import capabilities_response
from adcp.server.idempotency import IdempotencyStore, MemoryBackend

store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400)
return capabilities_response(
    ["media_buy"],
    idempotency=store.capability(),
)
def create_a2a_server(handler: ADCPHandler,
*,
name: str = 'adcp-agent',
port: int | None = None,
description: str | None = None,
version: str = '1.0.0',
test_controller: TestControllerStore | None = None) ‑> Any
Expand source code
def create_a2a_server(
    handler: ADCPHandler,
    *,
    name: str = "adcp-agent",
    port: int | None = None,
    description: str | None = None,
    version: str = "1.0.0",
    test_controller: TestControllerStore | None = None,
) -> Any:
    """Create an A2A Starlette application from an ADCP handler.

    Args:
        handler: An ADCPHandler subclass instance.
        name: Agent name shown in the A2A agent card.
        port: Port number (used in the agent card URL).
        description: Agent description for the agent card.
        version: Agent version string.
        test_controller: Optional TestControllerStore for storyboard testing.

    Returns:
        A Starlette app ready to be run with uvicorn.
    """
    from a2a.server.apps.jsonrpc.starlette_app import A2AStarletteApplication

    resolved_port = port or int(os.environ.get("PORT", "3001"))

    executor = ADCPAgentExecutor(handler, test_controller=test_controller)

    agent_card = _build_agent_card(
        handler,
        name=name,
        port=resolved_port,
        description=description,
        version=version,
        extra_skills=_test_controller_skills() if test_controller else None,
    )

    task_store = InMemoryTaskStore()

    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=task_store,
    )

    a2a_app = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=request_handler,
    )

    return a2a_app.build()

Create an A2A Starlette application from an ADCP handler.

Args

handler
An ADCPHandler subclass instance.
name
Agent name shown in the A2A agent card.
port
Port number (used in the agent card URL).
description
Agent description for the agent card.
version
Agent version string.
test_controller
Optional TestControllerStore for storyboard testing.

Returns

A Starlette app ready to be run with uvicorn.

def create_mcp_server(handler: ADCPHandler,
*,
name: str = 'adcp-agent',
port: int | None = None,
instructions: str | None = None,
include_test_controller: bool = False) ‑> Any
Expand source code
def create_mcp_server(
    handler: ADCPHandler,
    *,
    name: str = "adcp-agent",
    port: int | None = None,
    instructions: str | None = None,
    include_test_controller: bool = False,
) -> Any:
    """Create a FastMCP server from an ADCP handler without starting it.

    Use this when you need to customize the server before running it,
    or when you need to add extra non-ADCP tools.

    Args:
        handler: An ADCPHandler subclass instance.
        name: Server name.
        port: Port to listen on.
        instructions: Optional system instructions.
        include_test_controller: When False (default), skip registering
            ``comply_test_controller`` as a handler tool. Sellers who want
            compliance-testing support should pass ``test_controller=`` to
            :func:`serve`, which registers a store-backed implementation
            via :func:`register_test_controller` and sets this flag
            implicitly. Registering the handler stub unconditionally would
            advertise a tool the seller didn't opt into.

    Returns:
        A configured FastMCP server instance. Call mcp.run() to start.

    Example:
        mcp = create_mcp_server(MyAgent(), name="my-agent")
        mcp.run(transport="streamable-http")
    """
    from mcp.server.fastmcp import FastMCP

    resolved_port = port or int(os.environ.get("PORT", "3001"))
    mcp = FastMCP(name, instructions=instructions, port=resolved_port)
    _register_handler_tools(mcp, handler, include_test_controller=include_test_controller)
    return mcp

Create a FastMCP server from an ADCP handler without starting it.

Use this when you need to customize the server before running it, or when you need to add extra non-ADCP tools.

Args

handler
An ADCPHandler subclass instance.
name
Server name.
port
Port to listen on.
instructions
Optional system instructions.
include_test_controller
When False (default), skip registering comply_test_controller as a handler tool. Sellers who want compliance-testing support should pass test_controller= to :func:serve(), which registers a store-backed implementation via :func:register_test_controller() and sets this flag implicitly. Registering the handler stub unconditionally would advertise a tool the seller didn't opt into.

Returns

A configured FastMCP server instance. Call mcp.run() to start.

Example

mcp = create_mcp_server(MyAgent(), name="my-agent") mcp.run(transport="streamable-http")

def create_mcp_tools(handler: ADCPHandler) ‑> MCPToolSet
Expand source code
def create_mcp_tools(handler: ADCPHandler) -> MCPToolSet:
    """Create MCP tools from an ADCP handler.

    This is the main entry point for MCP server integration.

    Example with mcp library:
        from mcp.server import Server
        from adcp.server import ContentStandardsHandler, create_mcp_tools

        class MyHandler(ContentStandardsHandler):
            # ... implement methods

        handler = MyHandler()
        tools = create_mcp_tools(handler)

        server = Server("my-content-agent")

        @server.list_tools()
        async def list_tools():
            return tools.tool_definitions

        @server.call_tool()
        async def call_tool(name: str, arguments: dict):
            return await tools.call_tool(name, arguments)

    Args:
        handler: ADCP handler instance

    Returns:
        MCPToolSet with tool definitions and handlers
    """
    return MCPToolSet(handler)

Create MCP tools from an ADCP handler.

This is the main entry point for MCP server integration.

Example with mcp library: from mcp.server import Server from adcp.server import ContentStandardsHandler, create_mcp_tools

class MyHandler(ContentStandardsHandler):
    # ... implement methods

handler = MyHandler()
tools = create_mcp_tools(handler)

server = Server("my-content-agent")

@server.list_tools()
async def list_tools():
    return tools.tool_definitions

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    return await tools.call_tool(name, arguments)

Args

handler
ADCP handler instance

Returns

MCPToolSet with tool definitions and handlers

def creative_formats_response(formats: list[Any], *, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def creative_formats_response(
    formats: list[Any],
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a list_creative_formats response.

    Each format should include: format_id ({agent_url, id}), name.
    Matches ListCreativeFormatsResponse schema.
    """
    return {
        "formats": _serialize(formats),
        "sandbox": sandbox,
    }

Build a list_creative_formats response.

Each format should include: format_id ({agent_url, id}), name. Matches ListCreativeFormatsResponse schema.

def delivery_response(media_buy_deliveries: list[dict[str, Any]],
*,
reporting_period: dict[str, str] | None = None,
currency: str = 'USD',
sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def delivery_response(
    media_buy_deliveries: list[dict[str, Any]],
    *,
    reporting_period: dict[str, str] | None = None,
    currency: str = "USD",
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a get_media_buy_delivery response.

    Each media_buy_delivery should include:
        media_buy_id, status, totals (impressions, spend, etc.), by_package.

    Matches GetMediaBuyDeliveryResponse schema.

    Args:
        media_buy_deliveries: Array of delivery data per media buy.
        reporting_period: {"start": ISO timestamp, "end": ISO timestamp}.
            Defaults to current timestamp for both.
        currency: ISO 4217 currency code.
        sandbox: Whether this is simulated data.
    """
    now = datetime.now(timezone.utc).isoformat()
    return {
        "reporting_period": reporting_period or {"start": now, "end": now},
        "media_buy_deliveries": media_buy_deliveries,
        "currency": currency,
        "sandbox": sandbox,
    }

Build a get_media_buy_delivery response.

Each media_buy_delivery should include: media_buy_id, status, totals (impressions, spend, etc.), by_package.

Matches GetMediaBuyDeliveryResponse schema.

Args

media_buy_deliveries
Array of delivery data per media buy.
reporting_period
{"start": ISO timestamp, "end": ISO timestamp}. Defaults to current timestamp for both.
currency
ISO 4217 currency code.
sandbox
Whether this is simulated data.
def error_response(code: str, message: str) ‑> dict[str, typing.Any]
Expand source code
def error_response(code: str, message: str) -> dict[str, Any]:
    """Build a single AdCP error object (not a full error response).

    .. deprecated::
        Use ``adcp_error()`` from ``adcp.server.helpers`` instead.
        It returns a properly wrapped ``{"errors": [...]}`` response with
        auto-recovery classification. This function returns an unwrapped
        single error dict ``{"code": ..., "message": ...}`` which is not
        a valid ADCP error response on its own.
    """
    return {"code": code, "message": message}

Build a single AdCP error object (not a full error response).

Deprecated

Use adcp_error() from adcp.server.helpers instead. It returns a properly wrapped {"errors": [...]} response with auto-recovery classification. This function returns an unwrapped single error dict {"code": ..., "message": ...} which is not a valid ADCP error response on its own.

def get_tools_for_handler(handler: ADCPHandler | type[ADCPHandler]) ‑> list[dict[str, typing.Any]]
Expand source code
def get_tools_for_handler(handler: ADCPHandler | type[ADCPHandler]) -> list[dict[str, Any]]:
    """Return tool definitions filtered by handler type.

    Walks the MRO to find the matching handler base class, so subclasses
    (e.g. MyGovernanceAgent(GovernanceHandler)) get the correct tool set.
    ADCPHandler gets all tools. Unknown handlers get only protocol discovery
    (minimum privilege).

    Args:
        handler: The handler instance or class

    Returns:
        Filtered list of tool definitions
    """
    cls = handler if isinstance(handler, type) else type(handler)
    for base in cls.__mro__:
        if base.__name__ in _HANDLER_TOOLS:
            allowed = _HANDLER_TOOLS[base.__name__] | _PROTOCOL_TOOLS
            return [tool for tool in ADCP_TOOL_DEFINITIONS if tool["name"] in allowed]

    return [tool for tool in ADCP_TOOL_DEFINITIONS if tool["name"] in _PROTOCOL_TOOLS]

Return tool definitions filtered by handler type.

Walks the MRO to find the matching handler base class, so subclasses (e.g. MyGovernanceAgent(GovernanceHandler)) get the correct tool set. ADCPHandler gets all tools. Unknown handlers get only protocol discovery (minimum privilege).

Args

handler
The handler instance or class

Returns

Filtered list of tool definitions

def inject_context(params: dict[str, Any], response: dict[str, Any], *, max_size: int = 65536) ‑> dict[str, typing.Any]
Expand source code
def inject_context(
    params: dict[str, Any],
    response: dict[str, Any],
    *,
    max_size: int = _MAX_CONTEXT_SIZE,
) -> dict[str, Any]:
    """Auto-inject context passthrough from request into response.

    ADCP requires that if a request contains a ``context`` field,
    the response must echo it back unchanged. A size limit prevents
    resource amplification from oversized context payloads.

    The context field is opaque and may contain attacker-controlled
    data -- do not interpret or display its contents.
    """
    if "context" in params and "context" not in response:
        import json

        ctx = params["context"]
        if len(json.dumps(ctx, default=str)) <= max_size:
            response["context"] = ctx
    return response

Auto-inject context passthrough from request into response.

ADCP requires that if a request contains a context field, the response must echo it back unchanged. A size limit prevents resource amplification from oversized context payloads.

The context field is opaque and may contain attacker-controlled data – do not interpret or display its contents.

def is_terminal_status(status: str) ‑> bool
Expand source code
def is_terminal_status(status: str) -> bool:
    """Check if a media buy status is terminal (no further actions)."""
    return status in ("completed", "rejected", "canceled")

Check if a media buy status is terminal (no further actions).

def list_creatives_response(creatives: list[Any],
*,
pagination: dict[str, Any] | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def list_creatives_response(
    creatives: list[Any],
    *,
    pagination: dict[str, Any] | None = None,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a list_creatives response.

    Each creative should include: creative_id, name, format_id, status.
    Matches ListCreativesResponse schema.

    Timestamp defaults: every Creative item in the spec requires
    ``created_date`` and ``updated_date`` (ISO 8601 UTC). For any dict
    item that omits either field, this helper fills it with the current
    UTC timestamp (``datetime.now(timezone.utc).isoformat()``). Both
    fields default to the same value when neither is provided, which
    matches the intuitive meaning for a freshly-listed item. Explicit
    caller-provided values are always preserved. Pydantic model items
    are passed through ``_serialize`` unchanged — callers using typed
    Creative models should set timestamps on the model.
    """
    now = datetime.now(timezone.utc).isoformat()
    filled: list[Any] = []
    for item in creatives:
        if isinstance(item, dict):
            has_created = "created_date" in item and item["created_date"] is not None
            has_updated = "updated_date" in item and item["updated_date"] is not None
            if has_created and has_updated:
                filled.append(item)
                continue
            patched = dict(item)
            if not has_created:
                patched["created_date"] = now
            if not has_updated:
                patched["updated_date"] = now
            filled.append(patched)
        else:
            filled.append(item)

    count = len(filled)
    return {
        "creatives": _serialize(filled),
        "pagination": pagination or {"total": count, "has_more": False},
        "query_summary": {"total_results": count, "total_matching": count, "returned": count},
        "sandbox": sandbox,
    }

Build a list_creatives response.

Each creative should include: creative_id, name, format_id, status. Matches ListCreativesResponse schema.

Timestamp defaults: every Creative item in the spec requires created_date and updated_date (ISO 8601 UTC). For any dict item that omits either field, this helper fills it with the current UTC timestamp (datetime.now(timezone.utc).isoformat()). Both fields default to the same value when neither is provided, which matches the intuitive meaning for a freshly-listed item. Explicit caller-provided values are always preserved. Pydantic model items are passed through _serialize unchanged — callers using typed Creative models should set timestamps on the model.

def log_event_response(events_received: int, events_processed: int, *, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def log_event_response(
    events_received: int,
    events_processed: int,
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a log_event success response.

    Matches LogEventResponse1 (success) schema.
    """
    return {
        "events_received": events_received,
        "events_processed": events_processed,
        "sandbox": sandbox,
    }

Build a log_event success response.

Matches LogEventResponse1 (success) schema.

def media_buy_error_response(errors: list[dict[str, str]]) ‑> dict[str, typing.Any]
Expand source code
def media_buy_error_response(errors: list[dict[str, str]]) -> dict[str, Any]:
    """Build a create_media_buy error response.

    Each error dict: {"code": "...", "message": "..."}.
    Matches CreateMediaBuyResponse2 (error) schema.
    """
    return {"errors": errors}

Build a create_media_buy error response.

Each error dict: {"code": "…", "message": "…"}. Matches CreateMediaBuyResponse2 (error) schema.

def media_buy_response(media_buy_id: str,
packages: list[Any],
*,
buyer_ref: str | None = None,
status: str | None = None,
valid_actions: list[str] | None = None,
revision: int | None = None,
confirmed_at: str | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def media_buy_response(
    media_buy_id: str,
    packages: list[Any],
    *,
    buyer_ref: str | None = None,
    status: str | None = None,
    valid_actions: list[str] | None = None,
    revision: int | None = None,
    confirmed_at: str | None = None,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a create_media_buy success response.

    Each package should include: package_id, product_id, pricing_option_id, budget.
    Matches CreateMediaBuyResponse1 (success) schema.

    Auto-populates valid_actions from status if not provided.
    Auto-sets revision to 1 and confirmed_at to now if not provided.
    """
    resp: dict[str, Any] = {
        "media_buy_id": media_buy_id,
        "packages": _serialize(packages),
        "revision": revision if revision is not None else 1,
        "confirmed_at": confirmed_at or datetime.now(timezone.utc).isoformat(),
        "sandbox": sandbox,
    }
    if buyer_ref is not None:
        resp["buyer_ref"] = buyer_ref
    if status is not None:
        resp["status"] = status
        if valid_actions is None:
            resp["valid_actions"] = valid_actions_for_status(status)
        else:
            resp["valid_actions"] = valid_actions
    elif valid_actions is not None:
        resp["valid_actions"] = valid_actions
    return resp

Build a create_media_buy success response.

Each package should include: package_id, product_id, pricing_option_id, budget. Matches CreateMediaBuyResponse1 (success) schema.

Auto-populates valid_actions from status if not provided. Auto-sets revision to 1 and confirmed_at to now if not provided.

def media_buys_response(media_buys: list[Any], *, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def media_buys_response(
    media_buys: list[Any],
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a get_media_buys response.

    Each media buy should include: media_buy_id, status, currency, packages.
    Matches GetMediaBuysResponse schema.
    """
    return {
        "media_buys": _serialize(media_buys),
        "sandbox": sandbox,
    }

Build a get_media_buys response.

Each media buy should include: media_buy_id, status, currency, packages. Matches GetMediaBuysResponse schema.

def not_supported(reason: str = 'This operation is not supported by this agent') ‑> NotImplementedResponse
Expand source code
def not_supported(
    reason: str = "This operation is not supported by this agent",
) -> NotImplementedResponse:
    """Create a standard 'not supported' response.

    Use this to return from operations that your agent does not implement.

    Args:
        reason: Human-readable explanation of why the operation is not supported

    Returns:
        NotImplementedResponse with supported=False
    """
    return NotImplementedResponse(
        supported=False,
        reason=reason,
        error=Error(
            code="NOT_SUPPORTED",
            message=reason,
        ),
    )

Create a standard 'not supported' response.

Use this to return from operations that your agent does not implement.

Args

reason
Human-readable explanation of why the operation is not supported

Returns

NotImplementedResponse with supported=False

def preview_creative_response(previews: list[dict[str, Any]],
*,
expires_at: str | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def preview_creative_response(
    previews: list[dict[str, Any]],
    *,
    expires_at: str | None = None,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a preview_creative single response.

    Each preview should include:
        preview_id, input ({format_id, name, assets}),
        renders ([{render_id, output_format, preview_url, role, dimensions}]).

    Matches PreviewCreativeResponse1 (single) schema.
    """
    return {
        "response_type": "single",
        "previews": previews,
        "expires_at": expires_at or "2099-12-31T23:59:59Z",
        "sandbox": sandbox,
    }

Build a preview_creative single response.

Each preview should include: preview_id, input ({format_id, name, assets}), renders ([{render_id, output_format, preview_url, role, dimensions}]).

Matches PreviewCreativeResponse1 (single) schema.

def products_response(products: list[Any], *, item_count: int | None = None, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def products_response(
    products: list[Any],
    *,
    item_count: int | None = None,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a get_products response.

    Matches GetProductsResponse schema.
    """
    serialized = _serialize(products)
    resp: dict[str, Any] = {
        "products": serialized,
        "item_count": item_count if item_count is not None else len(serialized),
        "sandbox": sandbox,
    }
    return resp

Build a get_products response.

Matches GetProductsResponse schema.

def register_test_controller(mcp: Any,
store: TestControllerStore) ‑> None
Expand source code
def register_test_controller(mcp: Any, store: TestControllerStore) -> None:
    """Register the comply_test_controller tool on an MCP server.

    This is the Python equivalent of the JS SDK's registerTestController().
    It adds the comply_test_controller MCP tool backed by your TestControllerStore.

    Args:
        mcp: A FastMCP server instance.
        store: Your TestControllerStore implementation.

    Example:
        from adcp.server.test_controller import TestControllerStore, register_test_controller

        class MyStore(TestControllerStore):
            async def force_account_status(self, account_id, status):
                old = self.accounts[account_id]["status"]
                self.accounts[account_id]["status"] = status
                return {"previous_state": old, "current_state": status}

        mcp = create_mcp_server(MySeller(), name="my-agent")
        register_test_controller(mcp, MyStore())
        mcp.run(transport="streamable-http")
    """

    from mcp.server.fastmcp.tools import Tool
    from mcp.server.fastmcp.utilities.func_metadata import ArgModelBase, FuncMetadata
    from pydantic import ConfigDict

    async def comply_test_controller(**kwargs: Any) -> str:
        result = await _handle_test_controller(store, kwargs)
        return json.dumps(result)

    tool = Tool.from_function(
        comply_test_controller,
        name="comply_test_controller",
        description="Compliance test controller. Sandbox only, not for production use.",
    )

    # Override schema with the proper comply_test_controller inputSchema
    tool.parameters = {
        "type": "object",
        "properties": {
            "scenario": {
                "type": "string",
                "enum": [
                    "list_scenarios",
                    "force_creative_status",
                    "force_account_status",
                    "force_media_buy_status",
                    "force_session_status",
                    "simulate_delivery",
                    "simulate_budget_spend",
                ],
            },
            "params": {"type": "object"},
            "context": {"type": "object"},
        },
        "required": ["scenario"],
    }

    # Override fn_metadata with a permissive model
    class _ControllerArgs(ArgModelBase):
        model_config = ConfigDict(extra="allow")

        def model_dump_one_level(self) -> dict[str, Any]:
            result: dict[str, Any] = {}
            for field_name in self.__class__.model_fields:
                result[field_name] = getattr(self, field_name)
            if self.model_extra:
                result.update(self.model_extra)
            return result

    tool.fn_metadata = FuncMetadata(
        arg_model=_ControllerArgs,
        output_schema=tool.fn_metadata.output_schema,
        output_model=tool.fn_metadata.output_model,
        wrap_output=tool.fn_metadata.wrap_output,
    )

    mcp._tool_manager._tools["comply_test_controller"] = tool

Register the comply_test_controller tool on an MCP server.

This is the Python equivalent of the JS SDK's registerTestController(). It adds the comply_test_controller MCP tool backed by your TestControllerStore.

Args

mcp
A FastMCP server instance.
store
Your TestControllerStore implementation.

Example

from adcp.server.test_controller import TestControllerStore, register_test_controller

class MyStore(TestControllerStore): async def force_account_status(self, account_id, status): old = self.accounts[account_id]["status"] self.accounts[account_id]["status"] = status return {"previous_state": old, "current_state": status}

mcp = create_mcp_server(MySeller(), name="my-agent") register_test_controller(mcp, MyStore()) mcp.run(transport="streamable-http")

async def resolve_account(params: dict[str, Any], resolver: AccountResolver | None) ‑> tuple[typing.Any | None, dict[str, typing.Any] | None]
Expand source code
async def resolve_account(
    params: dict[str, Any],
    resolver: AccountResolver | None,
) -> tuple[Any | None, dict[str, Any] | None]:
    """Resolve an account reference from request params.

    Returns (account, None) on success, (None, error_dict) on failure,
    or (None, None) if no account field or no resolver configured.

    The resolver can return None (auto-ACCOUNT_NOT_FOUND) or raise
    ``AccountError`` for specific error codes (ACCOUNT_SUSPENDED,
    ACCOUNT_PAYMENT_REQUIRED, ACCOUNT_AMBIGUOUS, etc.).
    """
    if resolver is None or "account" not in params:
        return None, None
    try:
        account = await resolver(params["account"])
    except AccountError as e:
        return None, adcp_error(
            e.code,
            e.error_message,
            field="account",
            suggestion=e.suggestion,
        )
    if account is None:
        return None, adcp_error(
            "ACCOUNT_NOT_FOUND",
            "The specified account does not exist",
            field="account",
            suggestion="Use list_accounts to discover available accounts, "
            "or sync_accounts to create one",
        )
    return account, None

Resolve an account reference from request params.

Returns (account, None) on success, (None, error_dict) on failure, or (None, None) if no account field or no resolver configured.

The resolver can return None (auto-ACCOUNT_NOT_FOUND) or raise AccountError for specific error codes (ACCOUNT_SUSPENDED, ACCOUNT_PAYMENT_REQUIRED, ACCOUNT_AMBIGUOUS, etc.).

def serve(handler: ADCPHandler | Any,
*,
name: str = 'adcp-agent',
port: int | None = None,
transport: str = 'streamable-http',
instructions: str | None = None,
test_controller: TestControllerStore | None = None) ‑> None
Expand source code
def serve(
    handler: ADCPHandler | Any,
    *,
    name: str = "adcp-agent",
    port: int | None = None,
    transport: str = "streamable-http",
    instructions: str | None = None,
    test_controller: TestControllerStore | None = None,
) -> None:
    """Start an MCP or A2A server from an ADCP handler or server builder.

    Accepts either an ``ADCPHandler`` instance or an ``ADCPServerBuilder``
    (from ``adcp_server()``). Builders are auto-converted via ``build_handler()``.

    This is the simplest way to run an ADCP agent. Set ``transport="a2a"``
    to serve over the A2A protocol instead of MCP.

    Args:
        handler: An ADCPHandler subclass instance with your tool implementations.
        name: Server name shown to clients / in the A2A agent card.
        port: Port to listen on. Defaults to PORT env var, then 3001.
        transport: ``"streamable-http"`` (default, MCP) or ``"a2a"``.
        instructions: Optional system instructions for the agent (MCP only).
        test_controller: Optional TestControllerStore instance for storyboard testing.

    Security:
        This function does NOT configure authentication. In production,
        use a reverse proxy or middleware that validates credentials
        before forwarding to the endpoint. Without authentication,
        MCP exposes tools/list and A2A exposes /.well-known/agent.json,
        both of which reveal the agent's full capability surface.

    Example (MCP):
        from adcp.server import ADCPHandler, serve
        from adcp.server.responses import capabilities_response

        class MyAgent(ADCPHandler):
            async def get_adcp_capabilities(self, params, context=None):
                return capabilities_response(["media_buy"])

        serve(MyAgent(), name="my-agent")

    Example (A2A):
        serve(MyAgent(), name="my-agent", transport="a2a")

    With test controller:
        from adcp.server.test_controller import TestControllerStore

        class MyStore(TestControllerStore):
            async def force_account_status(self, account_id, status):
                ...

        serve(MyAgent(), name="my-agent", test_controller=MyStore())
    """
    # Accept ADCPServerBuilder from adcp_server() decorator pattern
    from adcp.server.builder import ADCPServerBuilder

    if isinstance(handler, ADCPServerBuilder):
        if not name or name == "adcp-agent":
            name = handler.name
        handler = handler.build_handler()

    if transport == "a2a":
        _serve_a2a(handler, name=name, port=port, test_controller=test_controller)
    elif transport in ("streamable-http", "sse", "stdio"):
        _serve_mcp(
            handler,
            name=name,
            port=port,
            transport=transport,
            instructions=instructions,
            test_controller=test_controller,
        )
    else:
        valid = ", ".join(sorted(("a2a", "streamable-http", "sse", "stdio")))
        raise ValueError(f"Unknown transport {transport!r}. Valid: {valid}")

Start an MCP or A2A server from an ADCP handler or server builder.

Accepts either an ADCPHandler instance or an ADCPServerBuilder (from adcp_server()). Builders are auto-converted via build_handler().

This is the simplest way to run an ADCP agent. Set transport="a2a" to serve over the A2A protocol instead of MCP.

Args

handler
An ADCPHandler subclass instance with your tool implementations.
name
Server name shown to clients / in the A2A agent card.
port
Port to listen on. Defaults to PORT env var, then 3001.
transport
"streamable-http" (default, MCP) or "a2a".
instructions
Optional system instructions for the agent (MCP only).
test_controller
Optional TestControllerStore instance for storyboard testing.

Security

This function does NOT configure authentication. In production, use a reverse proxy or middleware that validates credentials before forwarding to the endpoint. Without authentication, MCP exposes tools/list and A2A exposes /.well-known/agent.json, both of which reveal the agent's full capability surface.

Example (MCP): from adcp.server import ADCPHandler, serve from adcp.server.responses import capabilities_response

class MyAgent(ADCPHandler):
    async def get_adcp_capabilities(self, params, context=None):
        return capabilities_response(["media_buy"])

serve(MyAgent(), name="my-agent")

Example (A2A): serve(MyAgent(), name="my-agent", transport="a2a")

With test controller: from adcp.server.test_controller import TestControllerStore

class MyStore(TestControllerStore):
    async def force_account_status(self, account_id, status):
        ...

serve(MyAgent(), name="my-agent", test_controller=MyStore())
def signals_response(signals: list[Any], *, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def signals_response(
    signals: list[Any],
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a get_signals response.

    Each signal should include: signal_agent_segment_id, name, description,
    signal_type, data_provider, coverage_percentage, deployments, pricing_options, signal_id.
    Matches GetSignalsResponse schema.
    """
    return {
        "signals": _serialize(signals),
        "sandbox": sandbox,
    }

Build a get_signals response.

Each signal should include: signal_agent_segment_id, name, description, signal_type, data_provider, coverage_percentage, deployments, pricing_options, signal_id. Matches GetSignalsResponse schema.

def sync_accounts_response(accounts: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def sync_accounts_response(
    accounts: list[dict[str, Any]],
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a sync_accounts success response.

    Each account dict should include: account_id, brand, operator,
    action ("created"|"updated"), status ("active"|"pending_approval").

    Matches SyncAccountsResponse1 schema (field: "accounts").
    """
    return {"accounts": accounts, "sandbox": sandbox}

Build a sync_accounts success response.

Each account dict should include: account_id, brand, operator, action ("created"|"updated"), status ("active"|"pending_approval").

Matches SyncAccountsResponse1 schema (field: "accounts").

def sync_catalogs_response(catalogs: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def sync_catalogs_response(
    catalogs: list[dict[str, Any]],
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a sync_catalogs success response.

    Each catalog should include: catalog_id, action, item_count, items_approved.
    Matches SyncCatalogsResponse1 (success) schema.
    """
    return {
        "catalogs": catalogs,
        "sandbox": sandbox,
    }

Build a sync_catalogs success response.

Each catalog should include: catalog_id, action, item_count, items_approved. Matches SyncCatalogsResponse1 (success) schema.

def sync_creatives_response(creatives: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def sync_creatives_response(
    creatives: list[dict[str, Any]],
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a sync_creatives success response.

    Each creative dict should include: creative_id, action ("created"|"updated").
    Optionally: status ("processing"|"pending_review"|"approved"|"rejected"|"archived").
    Matches SyncCreativesResponse1 schema (field: "creatives").
    """
    return {"creatives": creatives, "sandbox": sandbox}

Build a sync_creatives success response.

Each creative dict should include: creative_id, action ("created"|"updated"). Optionally: status ("processing"|"pending_review"|"approved"|"rejected"|"archived"). Matches SyncCreativesResponse1 schema (field: "creatives").

def sync_governance_response(accounts: list[dict[str, Any]], *, sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def sync_governance_response(
    accounts: list[dict[str, Any]],
    *,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build a sync_governance response.

    Each account dict should include: account, status ("synced"),
    governance_agents ([{url, categories}]).
    """
    return {"accounts": accounts, "sandbox": sandbox}

Build a sync_governance response.

Each account dict should include: account, status ("synced"), governance_agents ([{url, categories}]).

def update_media_buy_response(media_buy_id: str,
*,
affected_packages: list[Any] | None = None,
status: str | None = None,
valid_actions: list[str] | None = None,
revision: int | None = None,
sandbox: bool = True) ‑> dict[str, typing.Any]
Expand source code
def update_media_buy_response(
    media_buy_id: str,
    *,
    affected_packages: list[Any] | None = None,
    status: str | None = None,
    valid_actions: list[str] | None = None,
    revision: int | None = None,
    sandbox: bool = True,
) -> dict[str, Any]:
    """Build an update_media_buy success response.

    Matches UpdateMediaBuyResponse1 (success) schema.
    Auto-populates valid_actions from status if not provided.
    """
    resp: dict[str, Any] = {
        "media_buy_id": media_buy_id,
        "sandbox": sandbox,
    }
    if affected_packages is not None:
        resp["affected_packages"] = _serialize(affected_packages)
    if status is not None:
        resp["status"] = status
        if valid_actions is None:
            resp["valid_actions"] = valid_actions_for_status(status)
        else:
            resp["valid_actions"] = valid_actions
    elif valid_actions is not None:
        resp["valid_actions"] = valid_actions
    if revision is not None:
        resp["revision"] = revision
    return resp

Build an update_media_buy success response.

Matches UpdateMediaBuyResponse1 (success) schema. Auto-populates valid_actions from status if not provided.

def valid_actions_for_status(status: str) ‑> list[str]
Expand source code
def valid_actions_for_status(status: str) -> list[str]:
    """Get valid buyer actions for a media buy status."""
    return list(MEDIA_BUY_STATE_MACHINE.get(status, []))

Get valid buyer actions for a media buy status.

def validate_capabilities(handler: Any, capabilities: GetAdcpCapabilitiesResponse) ‑> list[str]
Expand source code
def validate_capabilities(
    handler: Any,
    capabilities: GetAdcpCapabilitiesResponse,
) -> list[str]:
    """Check that a handler implements the methods required by its declared features.

    Compares the features declared in a capabilities response against the handler's
    method implementations. Returns warnings for features that are declared but
    whose corresponding handler methods are not overridden from the base class.

    This is a development-time check — call it at startup to catch misconfigurations.

    Args:
        handler: An ADCPHandler instance (or any object with handler methods).
        capabilities: The capabilities response the handler will serve.

    Returns:
        List of warning strings. Empty if everything is consistent.
    """
    # Late import to avoid circular dependency: server.base imports from adcp.types
    # which may transitively import from this module.
    from adcp.server.base import ADCPHandler

    resolver = FeatureResolver(capabilities)
    warnings: list[str] = []

    for feature, handler_methods in FEATURE_HANDLER_MAP.items():
        if not resolver.supports(feature):
            continue

        for method_name in handler_methods:
            if not hasattr(handler, method_name):
                warnings.append(
                    f"Feature '{feature}' is declared but handler has no "
                    f"'{method_name}' method"
                )
                continue

            # Walk MRO to check if any class between the leaf and ADCPHandler
            # overrides the method (handles mixin / intermediate-class patterns).
            if isinstance(handler, ADCPHandler):
                overridden = any(
                    method_name in cls.__dict__
                    for cls in type(handler).__mro__
                    if cls is not ADCPHandler and not issubclass(ADCPHandler, cls)
                )
                if not overridden:
                    warnings.append(
                        f"Feature '{feature}' is declared but '{method_name}' "
                        f"is not overridden from ADCPHandler"
                    )

    return warnings

Check that a handler implements the methods required by its declared features.

Compares the features declared in a capabilities response against the handler's method implementations. Returns warnings for features that are declared but whose corresponding handler methods are not overridden from the base class.

This is a development-time check — call it at startup to catch misconfigurations.

Args

handler
An ADCPHandler instance (or any object with handler methods).
capabilities
The capabilities response the handler will serve.

Returns

List of warning strings. Empty if everything is consistent.

Classes

class ADCPAgentExecutor (handler: ADCPHandler,
test_controller: TestControllerStore | None = None)
Expand source code
class ADCPAgentExecutor(AgentExecutor):
    """Bridges ADCPHandler methods to the a2a-sdk AgentExecutor interface.

    Incoming A2A messages are parsed to extract the ADCP skill name and
    parameters, dispatched to the matching handler method, and the result
    is published back as A2A Task events.

    Expects the explicit skill invocation format used by A2AAdapter:
        DataPart(data={"skill": "get_products", "parameters": {...}})
    """

    def __init__(
        self,
        handler: ADCPHandler,
        test_controller: TestControllerStore | None = None,
    ) -> None:
        self._handler = handler
        self._tool_callers: dict[str, Any] = {}

        # Build tool callers for all tools this handler supports.
        # Skip comply_test_controller unless the seller passed a
        # TestControllerStore; otherwise we would advertise a skill
        # backed only by the handler's not-supported stub.
        tool_defs = get_tools_for_handler(handler)
        for tool_def in tool_defs:
            name = tool_def["name"]
            if name == "comply_test_controller" and test_controller is None:
                continue
            self._tool_callers[name] = create_tool_caller(handler, name)

        if test_controller is not None:
            self._register_test_controller(test_controller)

    @property
    def supported_skills(self) -> list[str]:
        """List of skill names this executor can handle."""
        return list(self._tool_callers.keys())

    def _register_test_controller(self, store: TestControllerStore) -> None:
        """Register comply_test_controller as a callable skill."""

        async def _call_test_controller(
            params: dict[str, Any], context: ToolContext | None = None
        ) -> Any:
            return await _handle_test_controller(store, params)

        self._tool_callers["comply_test_controller"] = _call_test_controller

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        """Execute an ADCP skill from an incoming A2A message."""
        skill_name, params = self._parse_request(context)

        if skill_name is None:
            await self._send_error(event_queue, context, "No skill specified in message")
            return

        if skill_name not in self._tool_callers:
            await self._send_error(event_queue, context, f"Unknown skill: {skill_name}")
            return

        tool_context = _tool_context_from_request(context)
        try:
            result = await self._tool_callers[skill_name](params, tool_context)
            await self._send_result(event_queue, context, skill_name, result)
        except ADCPError as exc:
            # Application-layer AdCP error (IdempotencyConflictError etc.).
            # Emit a failed task with the adcp_error in a DataPart per
            # transport-errors.mdx §A2A Binding, plus a human-readable text
            # part. The JSON-RPC channel is reserved for transport-level
            # errors (auth rejected, rate-limited pre-dispatch).
            logger.info("AdCP application error for skill %s: %s", skill_name, exc)
            await self._send_adcp_error(event_queue, context, exc)
        except Exception:
            logger.exception("Error executing skill %s", skill_name)
            await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        """ADCP operations are synchronous; cancellation sets state to canceled."""
        event = _make_task(
            context,
            state=TaskState.canceled,
            message="Task canceled",
        )
        await event_queue.enqueue_event(event)

    # ------------------------------------------------------------------
    # Message parsing
    # ------------------------------------------------------------------

    def _parse_request(self, context: RequestContext) -> tuple[str | None, dict[str, Any]]:
        """Extract skill name and parameters from the A2A message.

        Supports two formats:
        1. Explicit skill invocation via DataPart:
           DataPart(data={"skill": "get_products", "parameters": {...}})
        2. Natural language fallback via TextPart (best-effort parse)
        """
        msg = context.message
        if msg is None or not msg.parts:
            return None, {}

        # Try DataPart first (explicit skill invocation)
        for part in msg.parts:
            inner = part.root if hasattr(part, "root") else part
            if isinstance(inner, DataPart) and isinstance(inner.data, dict):
                skill = inner.data.get("skill")
                params = inner.data.get("parameters", {})
                if skill:
                    return str(skill), params if isinstance(params, dict) else {}

        # Fallback: try to parse TextPart as JSON
        for part in msg.parts:
            inner = part.root if hasattr(part, "root") else part
            if isinstance(inner, TextPart):
                parsed = self._parse_text_request(inner.text)
                if parsed[0] is not None:
                    return parsed

        return None, {}

    def _parse_text_request(self, text: str) -> tuple[str | None, dict[str, Any]]:
        """Best-effort parse of a text request for skill + params."""
        try:
            data = json.loads(text)
            if isinstance(data, dict) and "skill" in data:
                return str(data["skill"]), data.get("parameters", {})
        except (json.JSONDecodeError, TypeError):
            pass
        return None, {}

    # ------------------------------------------------------------------
    # Response helpers
    # ------------------------------------------------------------------

    async def _send_result(
        self,
        event_queue: EventQueue,
        context: RequestContext,
        skill_name: str,
        result: Any,
    ) -> None:
        """Publish a completed task with the skill result."""
        # Normalize result to a JSON-safe dict
        if hasattr(result, "model_dump"):
            data = result.model_dump(mode="json", exclude_none=True)
        elif not isinstance(result, dict):
            data = {"result": result}
        else:
            data = result

        task = _make_task(
            context,
            state=TaskState.completed,
            data=data,
            message=f"Completed {skill_name}",
        )
        await event_queue.enqueue_event(task)

    async def _send_error(
        self,
        event_queue: EventQueue,
        context: RequestContext,
        error_msg: str,
    ) -> None:
        """Publish a failed task."""
        task = _make_task(
            context,
            state=TaskState.failed,
            message=error_msg,
        )
        await event_queue.enqueue_event(task)

    async def _send_adcp_error(
        self,
        event_queue: EventQueue,
        context: RequestContext,
        exc: ADCPError,
    ) -> None:
        """Publish a failed task carrying an AdCP ``adcp_error`` payload.

        Follows transport-errors.mdx §A2A Binding: failed task with artifact
        containing a ``DataPart`` keyed under ``adcp_error`` plus a terse
        ``TextPart`` for human/LLM consumption.
        """
        # Derive the spec error code. ADCPTaskError carries a list of codes
        # (e.g. IdempotencyConflictError → IDEMPOTENCY_CONFLICT); fall back
        # to a generic INTERNAL_ERROR when the exception doesn't supply one.
        code = "INTERNAL_ERROR"
        if isinstance(exc, ADCPTaskError) and exc.error_codes:
            code = str(exc.error_codes[0])

        adcp_error: dict[str, Any] = {
            "code": code,
            "message": exc.message,
        }
        recovery = STANDARD_ERROR_CODES.get(code, {}).get("recovery")
        if recovery:
            adcp_error["recovery"] = recovery
        suggestion = getattr(exc, "suggestion", None)
        if suggestion:
            adcp_error["suggestion"] = suggestion

        task = _make_task(
            context,
            state=TaskState.failed,
            data={"adcp_error": adcp_error},
            message=exc.message,
        )
        await event_queue.enqueue_event(task)

Bridges ADCPHandler methods to the a2a-sdk AgentExecutor interface.

Incoming A2A messages are parsed to extract the ADCP skill name and parameters, dispatched to the matching handler method, and the result is published back as A2A Task events.

Expects the explicit skill invocation format used by A2AAdapter: DataPart(data={"skill": "get_products", "parameters": {…}})

Ancestors

  • a2a.server.agent_execution.agent_executor.AgentExecutor
  • abc.ABC

Instance variables

prop supported_skills : list[str]
Expand source code
@property
def supported_skills(self) -> list[str]:
    """List of skill names this executor can handle."""
    return list(self._tool_callers.keys())

List of skill names this executor can handle.

Methods

async def cancel(self, context: RequestContext, event_queue: EventQueue) ‑> None
Expand source code
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
    """ADCP operations are synchronous; cancellation sets state to canceled."""
    event = _make_task(
        context,
        state=TaskState.canceled,
        message="Task canceled",
    )
    await event_queue.enqueue_event(event)

ADCP operations are synchronous; cancellation sets state to canceled.

async def execute(self, context: RequestContext, event_queue: EventQueue) ‑> None
Expand source code
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
    """Execute an ADCP skill from an incoming A2A message."""
    skill_name, params = self._parse_request(context)

    if skill_name is None:
        await self._send_error(event_queue, context, "No skill specified in message")
        return

    if skill_name not in self._tool_callers:
        await self._send_error(event_queue, context, f"Unknown skill: {skill_name}")
        return

    tool_context = _tool_context_from_request(context)
    try:
        result = await self._tool_callers[skill_name](params, tool_context)
        await self._send_result(event_queue, context, skill_name, result)
    except ADCPError as exc:
        # Application-layer AdCP error (IdempotencyConflictError etc.).
        # Emit a failed task with the adcp_error in a DataPart per
        # transport-errors.mdx §A2A Binding, plus a human-readable text
        # part. The JSON-RPC channel is reserved for transport-level
        # errors (auth rejected, rate-limited pre-dispatch).
        logger.info("AdCP application error for skill %s: %s", skill_name, exc)
        await self._send_adcp_error(event_queue, context, exc)
    except Exception:
        logger.exception("Error executing skill %s", skill_name)
        await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")

Execute an ADCP skill from an incoming A2A message.

class ADCPHandler
Expand source code
class ADCPHandler(ABC):
    """Base class for ADCP operation handlers.

    Subclass this to implement ADCP operations. All operations have default
    implementations that return 'not supported', allowing you to implement
    only the operations your agent supports.

    For protocol-specific handlers, use:
    - ContentStandardsHandler: For content standards agents
    - SponsoredIntelligenceHandler: For sponsored intelligence agents
    - GovernanceHandler: For governance agents
    """

    _agent_type: str = "this agent"

    def _not_supported(self, operation: str) -> NotImplementedResponse:
        """Create a not-supported response that includes the agent type."""
        return not_supported(f"{operation} is not supported by {self._agent_type}")

    # ========================================================================
    # Core Catalog Operations
    # ========================================================================

    async def get_products(
        self, params: GetProductsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Get advertising products.

        Override this to provide product catalog functionality.
        """
        return self._not_supported("get_products")

    async def list_creative_formats(
        self,
        params: ListCreativeFormatsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """List supported creative formats.

        Override this to provide creative format information.
        """
        return self._not_supported("list_creative_formats")

    # ========================================================================
    # Creative Operations
    # ========================================================================

    async def sync_creatives(
        self, params: SyncCreativesRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Sync creatives.

        Override this to handle creative synchronization.
        """
        return self._not_supported("sync_creatives")

    async def list_creatives(
        self, params: ListCreativesRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """List creatives.

        Override this to list synced creatives.
        """
        return self._not_supported("list_creatives")

    async def build_creative(
        self, params: BuildCreativeRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Build a creative.

        Override this to build creatives from assets.
        """
        return self._not_supported("build_creative")

    async def preview_creative(
        self, params: PreviewCreativeRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Preview a creative rendering.

        Override this to provide creative preview functionality.
        """
        return self._not_supported("preview_creative")

    async def get_creative_delivery(
        self,
        params: GetCreativeDeliveryRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Get creative delivery metrics.

        Override this to provide functionality.
        """
        return self._not_supported("get_creative_delivery")

    # ========================================================================
    # Media Buy Operations
    # ========================================================================

    async def create_media_buy(
        self, params: CreateMediaBuyRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Create a media buy.

        Override this to handle media buy creation.
        """
        return self._not_supported("create_media_buy")

    async def update_media_buy(
        self, params: UpdateMediaBuyRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Update a media buy.

        Override this to handle media buy updates.
        """
        return self._not_supported("update_media_buy")

    async def get_media_buy_delivery(
        self,
        params: GetMediaBuyDeliveryRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Get media buy delivery metrics.

        Override this to provide delivery reporting.
        """
        return self._not_supported("get_media_buy_delivery")

    async def get_media_buys(
        self, params: GetMediaBuysRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Get media buys with status and optional delivery snapshots.

        Override this to provide media buy listing functionality.
        """
        return self._not_supported("get_media_buys")

    # ========================================================================
    # Signal Operations
    # ========================================================================

    async def get_signals(
        self, params: GetSignalsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Get available signals.

        Override this to provide signal catalog.
        """
        return self._not_supported("get_signals")

    async def activate_signal(
        self, params: ActivateSignalRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Activate a signal.

        Override this to handle signal activation.
        """
        return self._not_supported("activate_signal")

    # ========================================================================
    # Feedback Operations
    # ========================================================================

    async def provide_performance_feedback(
        self,
        params: ProvidePerformanceFeedbackRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Provide performance feedback.

        Override this to handle performance feedback ingestion.
        """
        return self._not_supported("provide_performance_feedback")

    # ========================================================================
    # Account Operations
    # ========================================================================

    async def list_accounts(
        self, params: ListAccountsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """List accounts.

        Override this to provide functionality.
        """
        return self._not_supported("list_accounts")

    async def sync_accounts(
        self, params: SyncAccountsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Sync accounts.

        Override this to provide functionality.
        """
        return self._not_supported("sync_accounts")

    async def get_account_financials(
        self,
        params: GetAccountFinancialsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Get account financials.

        Override this to provide account financial reporting.
        """
        return self._not_supported("get_account_financials")

    async def report_usage(
        self, params: ReportUsageRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Report account usage.

        Override this to ingest account usage.
        """
        return self._not_supported("report_usage")

    # ========================================================================
    # Event Operations
    # ========================================================================

    async def log_event(
        self, params: LogEventRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Log event.

        Override this to provide functionality.
        """
        return self._not_supported("log_event")

    async def sync_event_sources(
        self, params: SyncEventSourcesRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Sync event sources.

        Override this to provide functionality.
        """
        return self._not_supported("sync_event_sources")

    async def sync_audiences(
        self, params: SyncAudiencesRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Sync audiences.

        Override this to provide audience synchronization.
        """
        return self._not_supported("sync_audiences")

    async def sync_governance(
        self, params: SyncGovernanceRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Sync governance agents for accounts.

        Override this to handle governance agent registration.
        """
        return self._not_supported("sync_governance")

    async def sync_catalogs(
        self, params: SyncCatalogsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Sync catalogs.

        Override this to provide catalog synchronization.
        """
        return self._not_supported("sync_catalogs")

    # ========================================================================
    # V3 Protocol Discovery
    # ========================================================================

    async def get_adcp_capabilities(
        self,
        params: GetAdcpCapabilitiesRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Get ADCP capabilities.

        Override this to advertise your agent's capabilities.
        """
        return self._not_supported("get_adcp_capabilities")

    # ========================================================================
    # V3 Content Standards Operations
    # ========================================================================

    async def create_content_standards(
        self,
        params: CreateContentStandardsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Create content standards configuration.

        Override this in ContentStandardsHandler subclasses.
        """
        return self._not_supported("create_content_standards")

    async def get_content_standards(
        self,
        params: GetContentStandardsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Get content standards configuration.

        Override this in ContentStandardsHandler subclasses.
        """
        return self._not_supported("get_content_standards")

    async def list_content_standards(
        self,
        params: ListContentStandardsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """List content standards configurations.

        Override this in ContentStandardsHandler subclasses.
        """
        return self._not_supported("list_content_standards")

    async def update_content_standards(
        self,
        params: UpdateContentStandardsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Update content standards configuration.

        Override this in ContentStandardsHandler subclasses.
        """
        return self._not_supported("update_content_standards")

    async def calibrate_content(
        self, params: CalibrateContentRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Calibrate content against standards.

        Override this in ContentStandardsHandler subclasses.
        """
        return self._not_supported("calibrate_content")

    async def validate_content_delivery(
        self,
        params: ValidateContentDeliveryRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Validate content delivery against standards.

        Override this in ContentStandardsHandler subclasses.
        """
        return self._not_supported("validate_content_delivery")

    async def get_media_buy_artifacts(
        self,
        params: GetMediaBuyArtifactsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Get artifacts associated with a media buy.

        Override this in ContentStandardsHandler subclasses.
        """
        return self._not_supported("get_media_buy_artifacts")

    # ========================================================================
    # V3 Sponsored Intelligence Operations
    # ========================================================================

    async def si_get_offering(
        self, params: SiGetOfferingRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Get sponsored intelligence offering.

        Override this in SponsoredIntelligenceHandler subclasses.
        """
        return self._not_supported("si_get_offering")

    async def si_initiate_session(
        self, params: SiInitiateSessionRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Initiate sponsored intelligence session.

        Override this in SponsoredIntelligenceHandler subclasses.
        """
        return self._not_supported("si_initiate_session")

    async def si_send_message(
        self, params: SiSendMessageRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Send message in sponsored intelligence session.

        Override this in SponsoredIntelligenceHandler subclasses.
        """
        return self._not_supported("si_send_message")

    async def si_terminate_session(
        self, params: SiTerminateSessionRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Terminate sponsored intelligence session.

        Override this in SponsoredIntelligenceHandler subclasses.
        """
        return self._not_supported("si_terminate_session")

    # ========================================================================
    # V3 Governance Operations
    # ========================================================================

    async def get_creative_features(
        self,
        params: GetCreativeFeaturesRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Evaluate governance features for a creative.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("get_creative_features")

    async def sync_plans(
        self, params: SyncPlansRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Sync campaign governance plans.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("sync_plans")

    async def check_governance(
        self, params: CheckGovernanceRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Check an action against campaign governance.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("check_governance")

    async def report_plan_outcome(
        self, params: ReportPlanOutcomeRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Report the outcome of a governed action.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("report_plan_outcome")

    async def get_plan_audit_logs(
        self, params: GetPlanAuditLogsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Retrieve governance audit logs for plans.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("get_plan_audit_logs")

    async def create_property_list(
        self, params: CreatePropertyListRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Create a property list for governance filtering.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("create_property_list")

    async def get_property_list(
        self, params: GetPropertyListRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Get a property list with optional resolution.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("get_property_list")

    async def list_property_lists(
        self, params: ListPropertyListsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """List property lists.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("list_property_lists")

    async def update_property_list(
        self, params: UpdatePropertyListRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Update a property list.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("update_property_list")

    async def delete_property_list(
        self, params: DeletePropertyListRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Delete a property list.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("delete_property_list")

    # ========================================================================
    # V3 Governance (Collection Lists) Operations
    # ========================================================================

    async def create_collection_list(
        self,
        params: CreateCollectionListRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Create a collection list for governance filtering.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("create_collection_list")

    async def get_collection_list(
        self, params: GetCollectionListRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Get a collection list with optional resolution.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("get_collection_list")

    async def list_collection_lists(
        self,
        params: ListCollectionListsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """List collection lists.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("list_collection_lists")

    async def update_collection_list(
        self,
        params: UpdateCollectionListRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Update a collection list.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("update_collection_list")

    async def delete_collection_list(
        self,
        params: DeleteCollectionListRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Delete a collection list.

        Override this in GovernanceHandler subclasses.
        """
        return self._not_supported("delete_collection_list")

    # ========================================================================
    # V3 TMP Operations
    # ========================================================================

    async def context_match(
        self, params: ContextMatchRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Match ad context to buyer packages.

        Override this to provide TMP context matching.
        """
        return self._not_supported("context_match")

    async def identity_match(
        self, params: IdentityMatchRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Match user identity for package eligibility.

        Override this to provide TMP identity matching.
        """
        return self._not_supported("identity_match")

    # ========================================================================
    # V3 Brand Rights Operations
    # ========================================================================

    async def get_brand_identity(
        self, params: GetBrandIdentityRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Get brand identity information.

        Override this in BrandHandler subclasses.
        """
        return self._not_supported("get_brand_identity")

    async def get_rights(
        self, params: GetRightsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Get available rights for licensing.

        Override this in BrandHandler subclasses.
        """
        return self._not_supported("get_rights")

    async def acquire_rights(
        self, params: AcquireRightsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Acquire rights for brand content usage.

        Override this in BrandHandler subclasses.
        """
        return self._not_supported("acquire_rights")

    async def update_rights(
        self, params: UpdateRightsRequest | dict[str, Any], context: ToolContext | None = None
    ) -> Any:
        """Update terms of an existing rights acquisition.

        Override this in BrandHandler subclasses. Partial update: the
        request carries ``rights_id`` plus any subset of the mutable fields
        (``end_date``, ``impression_cap``, ``pricing_option_id``, ``paused``).

        Seller responsibilities you own when implementing this:

        * Reject updates on expired or revoked acquisitions with an
          appropriate error code — do not partial-commit.
        * Reject ``pricing_option_id`` swaps to incompatible options — the
          new option's terms must be a strict superset of the original.
        * Apply all accepted fields atomically — callers should never
          observe a half-applied update on failure.
        """
        return self._not_supported("update_rights")

    # ========================================================================
    # V3 Compliance Operations
    # ========================================================================

    async def comply_test_controller(
        self,
        params: ComplyTestControllerRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> Any:
        """Compliance test controller (sandbox only).

        Override this in ComplianceHandler subclasses.
        """
        return self._not_supported("comply_test_controller")

Base class for ADCP operation handlers.

Subclass this to implement ADCP operations. All operations have default implementations that return 'not supported', allowing you to implement only the operations your agent supports.

For protocol-specific handlers, use: - ContentStandardsHandler: For content standards agents - SponsoredIntelligenceHandler: For sponsored intelligence agents - GovernanceHandler: For governance agents

Ancestors

  • abc.ABC

Subclasses

Methods

async def acquire_rights(self,
params: AcquireRightsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def acquire_rights(
    self, params: AcquireRightsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Acquire rights for brand content usage.

    Override this in BrandHandler subclasses.
    """
    return self._not_supported("acquire_rights")

Acquire rights for brand content usage.

Override this in BrandHandler subclasses.

async def activate_signal(self,
params: ActivateSignalRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def activate_signal(
    self, params: ActivateSignalRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Activate a signal.

    Override this to handle signal activation.
    """
    return self._not_supported("activate_signal")

Activate a signal.

Override this to handle signal activation.

async def build_creative(self,
params: BuildCreativeRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def build_creative(
    self, params: BuildCreativeRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Build a creative.

    Override this to build creatives from assets.
    """
    return self._not_supported("build_creative")

Build a creative.

Override this to build creatives from assets.

async def calibrate_content(self,
params: CalibrateContentRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def calibrate_content(
    self, params: CalibrateContentRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Calibrate content against standards.

    Override this in ContentStandardsHandler subclasses.
    """
    return self._not_supported("calibrate_content")

Calibrate content against standards.

Override this in ContentStandardsHandler subclasses.

async def check_governance(self,
params: CheckGovernanceRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def check_governance(
    self, params: CheckGovernanceRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Check an action against campaign governance.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("check_governance")

Check an action against campaign governance.

Override this in GovernanceHandler subclasses.

async def comply_test_controller(self,
params: ComplyTestControllerRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def comply_test_controller(
    self,
    params: ComplyTestControllerRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Compliance test controller (sandbox only).

    Override this in ComplianceHandler subclasses.
    """
    return self._not_supported("comply_test_controller")

Compliance test controller (sandbox only).

Override this in ComplianceHandler subclasses.

async def context_match(self,
params: ContextMatchRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def context_match(
    self, params: ContextMatchRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Match ad context to buyer packages.

    Override this to provide TMP context matching.
    """
    return self._not_supported("context_match")

Match ad context to buyer packages.

Override this to provide TMP context matching.

async def create_collection_list(self,
params: CreateCollectionListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def create_collection_list(
    self,
    params: CreateCollectionListRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Create a collection list for governance filtering.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("create_collection_list")

Create a collection list for governance filtering.

Override this in GovernanceHandler subclasses.

async def create_content_standards(self,
params: CreateContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def create_content_standards(
    self,
    params: CreateContentStandardsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Create content standards configuration.

    Override this in ContentStandardsHandler subclasses.
    """
    return self._not_supported("create_content_standards")

Create content standards configuration.

Override this in ContentStandardsHandler subclasses.

async def create_media_buy(self,
params: CreateMediaBuyRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def create_media_buy(
    self, params: CreateMediaBuyRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Create a media buy.

    Override this to handle media buy creation.
    """
    return self._not_supported("create_media_buy")

Create a media buy.

Override this to handle media buy creation.

async def create_property_list(self,
params: CreatePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def create_property_list(
    self, params: CreatePropertyListRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Create a property list for governance filtering.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("create_property_list")

Create a property list for governance filtering.

Override this in GovernanceHandler subclasses.

async def delete_collection_list(self,
params: DeleteCollectionListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def delete_collection_list(
    self,
    params: DeleteCollectionListRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Delete a collection list.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("delete_collection_list")

Delete a collection list.

Override this in GovernanceHandler subclasses.

async def delete_property_list(self,
params: DeletePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def delete_property_list(
    self, params: DeletePropertyListRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Delete a property list.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("delete_property_list")

Delete a property list.

Override this in GovernanceHandler subclasses.

async def get_account_financials(self,
params: GetAccountFinancialsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_account_financials(
    self,
    params: GetAccountFinancialsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Get account financials.

    Override this to provide account financial reporting.
    """
    return self._not_supported("get_account_financials")

Get account financials.

Override this to provide account financial reporting.

async def get_adcp_capabilities(self,
params: GetAdcpCapabilitiesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_adcp_capabilities(
    self,
    params: GetAdcpCapabilitiesRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Get ADCP capabilities.

    Override this to advertise your agent's capabilities.
    """
    return self._not_supported("get_adcp_capabilities")

Get ADCP capabilities.

Override this to advertise your agent's capabilities.

async def get_brand_identity(self,
params: GetBrandIdentityRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_brand_identity(
    self, params: GetBrandIdentityRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Get brand identity information.

    Override this in BrandHandler subclasses.
    """
    return self._not_supported("get_brand_identity")

Get brand identity information.

Override this in BrandHandler subclasses.

async def get_collection_list(self,
params: GetCollectionListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_collection_list(
    self, params: GetCollectionListRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Get a collection list with optional resolution.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("get_collection_list")

Get a collection list with optional resolution.

Override this in GovernanceHandler subclasses.

async def get_content_standards(self,
params: GetContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_content_standards(
    self,
    params: GetContentStandardsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Get content standards configuration.

    Override this in ContentStandardsHandler subclasses.
    """
    return self._not_supported("get_content_standards")

Get content standards configuration.

Override this in ContentStandardsHandler subclasses.

async def get_creative_delivery(self,
params: GetCreativeDeliveryRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_creative_delivery(
    self,
    params: GetCreativeDeliveryRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Get creative delivery metrics.

    Override this to provide functionality.
    """
    return self._not_supported("get_creative_delivery")

Get creative delivery metrics.

Override this to provide functionality.

async def get_creative_features(self,
params: GetCreativeFeaturesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_creative_features(
    self,
    params: GetCreativeFeaturesRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Evaluate governance features for a creative.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("get_creative_features")

Evaluate governance features for a creative.

Override this in GovernanceHandler subclasses.

async def get_media_buy_artifacts(self,
params: GetMediaBuyArtifactsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_media_buy_artifacts(
    self,
    params: GetMediaBuyArtifactsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Get artifacts associated with a media buy.

    Override this in ContentStandardsHandler subclasses.
    """
    return self._not_supported("get_media_buy_artifacts")

Get artifacts associated with a media buy.

Override this in ContentStandardsHandler subclasses.

async def get_media_buy_delivery(self,
params: GetMediaBuyDeliveryRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_media_buy_delivery(
    self,
    params: GetMediaBuyDeliveryRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Get media buy delivery metrics.

    Override this to provide delivery reporting.
    """
    return self._not_supported("get_media_buy_delivery")

Get media buy delivery metrics.

Override this to provide delivery reporting.

async def get_media_buys(self,
params: GetMediaBuysRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_media_buys(
    self, params: GetMediaBuysRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Get media buys with status and optional delivery snapshots.

    Override this to provide media buy listing functionality.
    """
    return self._not_supported("get_media_buys")

Get media buys with status and optional delivery snapshots.

Override this to provide media buy listing functionality.

async def get_plan_audit_logs(self,
params: GetPlanAuditLogsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_plan_audit_logs(
    self, params: GetPlanAuditLogsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Retrieve governance audit logs for plans.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("get_plan_audit_logs")

Retrieve governance audit logs for plans.

Override this in GovernanceHandler subclasses.

async def get_products(self,
params: GetProductsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_products(
    self, params: GetProductsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Get advertising products.

    Override this to provide product catalog functionality.
    """
    return self._not_supported("get_products")

Get advertising products.

Override this to provide product catalog functionality.

async def get_property_list(self,
params: GetPropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_property_list(
    self, params: GetPropertyListRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Get a property list with optional resolution.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("get_property_list")

Get a property list with optional resolution.

Override this in GovernanceHandler subclasses.

async def get_rights(self,
params: GetRightsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_rights(
    self, params: GetRightsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Get available rights for licensing.

    Override this in BrandHandler subclasses.
    """
    return self._not_supported("get_rights")

Get available rights for licensing.

Override this in BrandHandler subclasses.

async def get_signals(self,
params: GetSignalsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def get_signals(
    self, params: GetSignalsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Get available signals.

    Override this to provide signal catalog.
    """
    return self._not_supported("get_signals")

Get available signals.

Override this to provide signal catalog.

async def identity_match(self,
params: IdentityMatchRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def identity_match(
    self, params: IdentityMatchRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Match user identity for package eligibility.

    Override this to provide TMP identity matching.
    """
    return self._not_supported("identity_match")

Match user identity for package eligibility.

Override this to provide TMP identity matching.

async def list_accounts(self,
params: ListAccountsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def list_accounts(
    self, params: ListAccountsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """List accounts.

    Override this to provide functionality.
    """
    return self._not_supported("list_accounts")

List accounts.

Override this to provide functionality.

async def list_collection_lists(self,
params: ListCollectionListsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def list_collection_lists(
    self,
    params: ListCollectionListsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """List collection lists.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("list_collection_lists")

List collection lists.

Override this in GovernanceHandler subclasses.

async def list_content_standards(self,
params: ListContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def list_content_standards(
    self,
    params: ListContentStandardsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """List content standards configurations.

    Override this in ContentStandardsHandler subclasses.
    """
    return self._not_supported("list_content_standards")

List content standards configurations.

Override this in ContentStandardsHandler subclasses.

async def list_creative_formats(self,
params: ListCreativeFormatsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def list_creative_formats(
    self,
    params: ListCreativeFormatsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """List supported creative formats.

    Override this to provide creative format information.
    """
    return self._not_supported("list_creative_formats")

List supported creative formats.

Override this to provide creative format information.

async def list_creatives(self,
params: ListCreativesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def list_creatives(
    self, params: ListCreativesRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """List creatives.

    Override this to list synced creatives.
    """
    return self._not_supported("list_creatives")

List creatives.

Override this to list synced creatives.

async def list_property_lists(self,
params: ListPropertyListsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def list_property_lists(
    self, params: ListPropertyListsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """List property lists.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("list_property_lists")

List property lists.

Override this in GovernanceHandler subclasses.

async def log_event(self,
params: LogEventRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def log_event(
    self, params: LogEventRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Log event.

    Override this to provide functionality.
    """
    return self._not_supported("log_event")

Log event.

Override this to provide functionality.

async def preview_creative(self,
params: PreviewCreativeRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def preview_creative(
    self, params: PreviewCreativeRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Preview a creative rendering.

    Override this to provide creative preview functionality.
    """
    return self._not_supported("preview_creative")

Preview a creative rendering.

Override this to provide creative preview functionality.

async def provide_performance_feedback(self,
params: ProvidePerformanceFeedbackRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def provide_performance_feedback(
    self,
    params: ProvidePerformanceFeedbackRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Provide performance feedback.

    Override this to handle performance feedback ingestion.
    """
    return self._not_supported("provide_performance_feedback")

Provide performance feedback.

Override this to handle performance feedback ingestion.

async def report_plan_outcome(self,
params: ReportPlanOutcomeRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def report_plan_outcome(
    self, params: ReportPlanOutcomeRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Report the outcome of a governed action.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("report_plan_outcome")

Report the outcome of a governed action.

Override this in GovernanceHandler subclasses.

async def report_usage(self,
params: ReportUsageRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def report_usage(
    self, params: ReportUsageRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Report account usage.

    Override this to ingest account usage.
    """
    return self._not_supported("report_usage")

Report account usage.

Override this to ingest account usage.

async def si_get_offering(self,
params: SiGetOfferingRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def si_get_offering(
    self, params: SiGetOfferingRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Get sponsored intelligence offering.

    Override this in SponsoredIntelligenceHandler subclasses.
    """
    return self._not_supported("si_get_offering")

Get sponsored intelligence offering.

Override this in SponsoredIntelligenceHandler subclasses.

async def si_initiate_session(self,
params: SiInitiateSessionRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def si_initiate_session(
    self, params: SiInitiateSessionRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Initiate sponsored intelligence session.

    Override this in SponsoredIntelligenceHandler subclasses.
    """
    return self._not_supported("si_initiate_session")

Initiate sponsored intelligence session.

Override this in SponsoredIntelligenceHandler subclasses.

async def si_send_message(self,
params: SiSendMessageRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def si_send_message(
    self, params: SiSendMessageRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Send message in sponsored intelligence session.

    Override this in SponsoredIntelligenceHandler subclasses.
    """
    return self._not_supported("si_send_message")

Send message in sponsored intelligence session.

Override this in SponsoredIntelligenceHandler subclasses.

async def si_terminate_session(self,
params: SiTerminateSessionRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def si_terminate_session(
    self, params: SiTerminateSessionRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Terminate sponsored intelligence session.

    Override this in SponsoredIntelligenceHandler subclasses.
    """
    return self._not_supported("si_terminate_session")

Terminate sponsored intelligence session.

Override this in SponsoredIntelligenceHandler subclasses.

async def sync_accounts(self,
params: SyncAccountsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def sync_accounts(
    self, params: SyncAccountsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Sync accounts.

    Override this to provide functionality.
    """
    return self._not_supported("sync_accounts")

Sync accounts.

Override this to provide functionality.

async def sync_audiences(self,
params: SyncAudiencesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def sync_audiences(
    self, params: SyncAudiencesRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Sync audiences.

    Override this to provide audience synchronization.
    """
    return self._not_supported("sync_audiences")

Sync audiences.

Override this to provide audience synchronization.

async def sync_catalogs(self,
params: SyncCatalogsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def sync_catalogs(
    self, params: SyncCatalogsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Sync catalogs.

    Override this to provide catalog synchronization.
    """
    return self._not_supported("sync_catalogs")

Sync catalogs.

Override this to provide catalog synchronization.

async def sync_creatives(self,
params: SyncCreativesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def sync_creatives(
    self, params: SyncCreativesRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Sync creatives.

    Override this to handle creative synchronization.
    """
    return self._not_supported("sync_creatives")

Sync creatives.

Override this to handle creative synchronization.

async def sync_event_sources(self,
params: SyncEventSourcesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def sync_event_sources(
    self, params: SyncEventSourcesRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Sync event sources.

    Override this to provide functionality.
    """
    return self._not_supported("sync_event_sources")

Sync event sources.

Override this to provide functionality.

async def sync_governance(self,
params: SyncGovernanceRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def sync_governance(
    self, params: SyncGovernanceRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Sync governance agents for accounts.

    Override this to handle governance agent registration.
    """
    return self._not_supported("sync_governance")

Sync governance agents for accounts.

Override this to handle governance agent registration.

async def sync_plans(self,
params: SyncPlansRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def sync_plans(
    self, params: SyncPlansRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Sync campaign governance plans.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("sync_plans")

Sync campaign governance plans.

Override this in GovernanceHandler subclasses.

async def update_collection_list(self,
params: UpdateCollectionListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def update_collection_list(
    self,
    params: UpdateCollectionListRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Update a collection list.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("update_collection_list")

Update a collection list.

Override this in GovernanceHandler subclasses.

async def update_content_standards(self,
params: UpdateContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def update_content_standards(
    self,
    params: UpdateContentStandardsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Update content standards configuration.

    Override this in ContentStandardsHandler subclasses.
    """
    return self._not_supported("update_content_standards")

Update content standards configuration.

Override this in ContentStandardsHandler subclasses.

async def update_media_buy(self,
params: UpdateMediaBuyRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def update_media_buy(
    self, params: UpdateMediaBuyRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Update a media buy.

    Override this to handle media buy updates.
    """
    return self._not_supported("update_media_buy")

Update a media buy.

Override this to handle media buy updates.

async def update_property_list(self,
params: UpdatePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def update_property_list(
    self, params: UpdatePropertyListRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Update a property list.

    Override this in GovernanceHandler subclasses.
    """
    return self._not_supported("update_property_list")

Update a property list.

Override this in GovernanceHandler subclasses.

async def update_rights(self,
params: UpdateRightsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def update_rights(
    self, params: UpdateRightsRequest | dict[str, Any], context: ToolContext | None = None
) -> Any:
    """Update terms of an existing rights acquisition.

    Override this in BrandHandler subclasses. Partial update: the
    request carries ``rights_id`` plus any subset of the mutable fields
    (``end_date``, ``impression_cap``, ``pricing_option_id``, ``paused``).

    Seller responsibilities you own when implementing this:

    * Reject updates on expired or revoked acquisitions with an
      appropriate error code — do not partial-commit.
    * Reject ``pricing_option_id`` swaps to incompatible options — the
      new option's terms must be a strict superset of the original.
    * Apply all accepted fields atomically — callers should never
      observe a half-applied update on failure.
    """
    return self._not_supported("update_rights")

Update terms of an existing rights acquisition.

Override this in BrandHandler subclasses. Partial update: the request carries rights_id plus any subset of the mutable fields (end_date, impression_cap, pricing_option_id, paused).

Seller responsibilities you own when implementing this:

  • Reject updates on expired or revoked acquisitions with an appropriate error code — do not partial-commit.
  • Reject pricing_option_id swaps to incompatible options — the new option's terms must be a strict superset of the original.
  • Apply all accepted fields atomically — callers should never observe a half-applied update on failure.
async def validate_content_delivery(self,
params: ValidateContentDeliveryRequest | dict[str, Any],
context: ToolContext | None = None) ‑> Any
Expand source code
async def validate_content_delivery(
    self,
    params: ValidateContentDeliveryRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> Any:
    """Validate content delivery against standards.

    Override this in ContentStandardsHandler subclasses.
    """
    return self._not_supported("validate_content_delivery")

Validate content delivery against standards.

Override this in ContentStandardsHandler subclasses.

class ADCPServerBuilder (name: str, *, version: str = '1.0.0')
Expand source code
class ADCPServerBuilder:
    """Declarative server builder using decorators.

    Use ``adcp_server()`` to create an instance, then register handlers
    with decorators. The builder can be passed directly to ``serve()``.

    Example::

        server = adcp_server("my-seller")

        @server.get_products
        async def get_products(params, context=None):
            return products_response(MY_PRODUCTS)

        serve(server, name="my-seller")
    """

    def __init__(self, name: str, *, version: str = "1.0.0") -> None:
        self.name = name
        self.version = version
        self._handlers: dict[str, Callable[..., Any]] = {}

    def __getattr__(self, task_name: str) -> Callable[..., Any]:
        """Return a decorator that registers a handler for the given task."""
        if task_name.startswith("_"):
            raise AttributeError(task_name)

        def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
            if (
                task_name not in HANDLER_TO_DOMAIN
                and task_name != "get_adcp_capabilities"
            ):
                raise ValueError(
                    f"'{task_name}' is not a known ADCP task. "
                    f"Check for typos."
                )
            self._handlers[task_name] = fn
            return fn

        return decorator

    def _detect_domains(self) -> list[str]:
        """Detect which ADCP domains the registered handlers cover."""
        domains: set[str] = set()
        for handler_name in self._handlers:
            domain = HANDLER_TO_DOMAIN.get(handler_name)
            if domain:
                domains.add(domain)
        return sorted(domains)

    def build_handler(self) -> ADCPHandler:
        """Build an ADCPHandler from registered decorators.

        If ``get_adcp_capabilities`` is not registered, it will be
        auto-generated from the detected domains.
        """
        handlers = dict(self._handlers)

        # Auto-generate capabilities if not provided
        if "get_adcp_capabilities" not in handlers:
            domains = self._detect_domains()
            if domains:
                from adcp.server.responses import capabilities_response

                async def auto_capabilities(
                    params: Any, context: Any = None
                ) -> dict[str, Any]:
                    return capabilities_response(domains)

                handlers["get_adcp_capabilities"] = auto_capabilities

        # Create a dynamic subclass
        class DynamicHandler(ADCPHandler):
            pass

        for task_name, fn in handlers.items():
            # Wrap standalone functions to accept self
            async def _bound_method(
                self: Any,
                params: Any,
                context: Any = None,
                _fn: Callable[..., Any] = fn,
            ) -> Any:
                return await _fn(params, context)

            setattr(DynamicHandler, task_name, _bound_method)

        return DynamicHandler()

Declarative server builder using decorators.

Use adcp_server() to create an instance, then register handlers with decorators. The builder can be passed directly to serve().

Example::

server = adcp_server("my-seller")

@server.get_products
async def get_products(params, context=None):
    return products_response(MY_PRODUCTS)

serve(server, name="my-seller")

Methods

def build_handler(self) ‑> ADCPHandler
Expand source code
def build_handler(self) -> ADCPHandler:
    """Build an ADCPHandler from registered decorators.

    If ``get_adcp_capabilities`` is not registered, it will be
    auto-generated from the detected domains.
    """
    handlers = dict(self._handlers)

    # Auto-generate capabilities if not provided
    if "get_adcp_capabilities" not in handlers:
        domains = self._detect_domains()
        if domains:
            from adcp.server.responses import capabilities_response

            async def auto_capabilities(
                params: Any, context: Any = None
            ) -> dict[str, Any]:
                return capabilities_response(domains)

            handlers["get_adcp_capabilities"] = auto_capabilities

    # Create a dynamic subclass
    class DynamicHandler(ADCPHandler):
        pass

    for task_name, fn in handlers.items():
        # Wrap standalone functions to accept self
        async def _bound_method(
            self: Any,
            params: Any,
            context: Any = None,
            _fn: Callable[..., Any] = fn,
        ) -> Any:
            return await _fn(params, context)

        setattr(DynamicHandler, task_name, _bound_method)

    return DynamicHandler()

Build an ADCPHandler from registered decorators.

If get_adcp_capabilities is not registered, it will be auto-generated from the detected domains.

class AccountError (code: str, message: str | None = None, *, suggestion: str | None = None)
Expand source code
class AccountError(Exception):
    """Raised by account resolvers to indicate a specific account error.

    Use this in your resolver to return structured errors for cases
    beyond simple "not found"::

        async def my_resolver(ref):
            account = db.find(ref)
            if not account:
                return None  # auto-returns ACCOUNT_NOT_FOUND
            if account.status == "suspended":
                raise AccountError("ACCOUNT_SUSPENDED", "Account is suspended")
            if account.status == "payment_required":
                raise AccountError("ACCOUNT_PAYMENT_REQUIRED",
                    suggestion="Update payment method at https://...")
            return account
    """

    def __init__(
        self,
        code: str,
        message: str | None = None,
        *,
        suggestion: str | None = None,
    ):
        self.code = code
        self.error_message = message
        self.suggestion = suggestion
        super().__init__(message or code)

Raised by account resolvers to indicate a specific account error.

Use this in your resolver to return structured errors for cases beyond simple "not found"::

async def my_resolver(ref):
    account = db.find(ref)
    if not account:
        return None  # auto-returns ACCOUNT_NOT_FOUND
    if account.status == "suspended":
        raise AccountError("ACCOUNT_SUSPENDED", "Account is suspended")
    if account.status == "payment_required":
        raise AccountError("ACCOUNT_PAYMENT_REQUIRED",
            suggestion="Update payment method at <https://...">)
    return account

Ancestors

  • builtins.Exception
  • builtins.BaseException
class BrandHandler
Expand source code
class BrandHandler(ADCPHandler):
    """Handler for brand rights operations.

    Subclass this to implement brand identity and rights management.
    Only brand rights tools will be exposed via MCP.

    Example:
        class MyBrandAgent(BrandHandler):
            async def get_brand_identity(self, params, context=None):
                # Implement brand identity lookup
                pass
    """

    _agent_type = "Brand agents"

Handler for brand rights operations.

Subclass this to implement brand identity and rights management. Only brand rights tools will be exposed via MCP.

Example

class MyBrandAgent(BrandHandler): async def get_brand_identity(self, params, context=None): # Implement brand identity lookup pass

Ancestors

Inherited members

class ComplianceHandler
Expand source code
class ComplianceHandler(ADCPHandler):
    """Handler for compliance test operations.

    Subclass this to implement compliance sandbox testing.
    Only compliance tools will be exposed via MCP.

    Example:
        class MyComplianceAgent(ComplianceHandler):
            async def comply_test_controller(self, params, context=None):
                # Implement test controller
                pass
    """

    _agent_type = "Compliance agents"

Handler for compliance test operations.

Subclass this to implement compliance sandbox testing. Only compliance tools will be exposed via MCP.

Example

class MyComplianceAgent(ComplianceHandler): async def comply_test_controller(self, params, context=None): # Implement test controller pass

Ancestors

Inherited members

class ContentStandardsHandler
Expand source code
class ContentStandardsHandler(ADCPHandler):
    """Handler for Content Standards protocol.

    Subclass this to implement a Content Standards agent. All Content Standards
    operations must be implemented via the handle_* methods.
    The public methods (create_content_standards, etc.) handle validation and
    error handling automatically.

    Non-Content-Standards operations (get_products, create_media_buy, etc.)
    return 'not supported' via the base class.

    Example:
        class MyContentStandardsHandler(ContentStandardsHandler):
            async def handle_create_content_standards(
                self,
                request: CreateContentStandardsRequest,
                context: ToolContext | None = None
            ) -> CreateContentStandardsResponse:
                # Your implementation
                return CreateContentStandardsResponse(...)
    """

    _agent_type: str = "Content Standards agents"

    # ========================================================================
    # Content Standards Operations - Override base class with validation
    # ========================================================================

    async def create_content_standards(
        self,
        params: CreateContentStandardsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> CreateContentStandardsResponse | NotImplementedResponse:
        """Create content standards configuration.

        Validates params and delegates to handle_create_content_standards.
        """
        try:
            request = CreateContentStandardsRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_create_content_standards(request, context)

    async def get_content_standards(
        self,
        params: GetContentStandardsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> GetContentStandardsResponse | NotImplementedResponse:
        """Get content standards configuration.

        Validates params and delegates to handle_get_content_standards.
        """
        try:
            request = GetContentStandardsRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_get_content_standards(request, context)

    async def list_content_standards(
        self,
        params: ListContentStandardsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> ListContentStandardsResponse | NotImplementedResponse:
        """List content standards configurations.

        Validates params and delegates to handle_list_content_standards.
        """
        try:
            request = ListContentStandardsRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_list_content_standards(request, context)

    async def update_content_standards(
        self,
        params: UpdateContentStandardsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> UpdateContentStandardsResponse | NotImplementedResponse:
        """Update content standards configuration.

        Validates params and delegates to handle_update_content_standards.
        """
        try:
            request = UpdateContentStandardsRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_update_content_standards(request, context)

    async def calibrate_content(
        self,
        params: CalibrateContentRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> CalibrateContentResponse | NotImplementedResponse:
        """Calibrate content against standards.

        Validates params and delegates to handle_calibrate_content.
        """
        try:
            request = CalibrateContentRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_calibrate_content(request, context)

    async def validate_content_delivery(
        self,
        params: ValidateContentDeliveryRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> ValidateContentDeliveryResponse | NotImplementedResponse:
        """Validate content delivery against standards.

        Validates params and delegates to handle_validate_content_delivery.
        """
        try:
            request = ValidateContentDeliveryRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_validate_content_delivery(request, context)

    async def get_media_buy_artifacts(
        self,
        params: GetMediaBuyArtifactsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> GetMediaBuyArtifactsResponse | NotImplementedResponse:
        """Get artifacts associated with a media buy.

        Validates params and delegates to handle_get_media_buy_artifacts.
        """
        try:
            request = GetMediaBuyArtifactsRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_get_media_buy_artifacts(request, context)

    # ========================================================================
    # Abstract handlers - Implement these in subclasses
    # ========================================================================

    @abstractmethod
    async def handle_create_content_standards(
        self,
        request: CreateContentStandardsRequest,
        context: ToolContext | None = None,
    ) -> CreateContentStandardsResponse:
        """Handle create content standards request."""
        ...

    @abstractmethod
    async def handle_get_content_standards(
        self,
        request: GetContentStandardsRequest,
        context: ToolContext | None = None,
    ) -> GetContentStandardsResponse:
        """Handle get content standards request."""
        ...

    @abstractmethod
    async def handle_list_content_standards(
        self,
        request: ListContentStandardsRequest,
        context: ToolContext | None = None,
    ) -> ListContentStandardsResponse:
        """Handle list content standards request."""
        ...

    @abstractmethod
    async def handle_update_content_standards(
        self,
        request: UpdateContentStandardsRequest,
        context: ToolContext | None = None,
    ) -> UpdateContentStandardsResponse:
        """Handle update content standards request."""
        ...

    @abstractmethod
    async def handle_calibrate_content(
        self,
        request: CalibrateContentRequest,
        context: ToolContext | None = None,
    ) -> CalibrateContentResponse:
        """Handle calibrate content request."""
        ...

    @abstractmethod
    async def handle_validate_content_delivery(
        self,
        request: ValidateContentDeliveryRequest,
        context: ToolContext | None = None,
    ) -> ValidateContentDeliveryResponse:
        """Handle validate content delivery request."""
        ...

    @abstractmethod
    async def handle_get_media_buy_artifacts(
        self,
        request: GetMediaBuyArtifactsRequest,
        context: ToolContext | None = None,
    ) -> GetMediaBuyArtifactsResponse:
        """Handle get media buy artifacts request."""
        ...

Handler for Content Standards protocol.

Subclass this to implement a Content Standards agent. All Content Standards operations must be implemented via the handle_* methods. The public methods (create_content_standards, etc.) handle validation and error handling automatically.

Non-Content-Standards operations (get_products, create_media_buy, etc.) return 'not supported' via the base class.

Example

class MyContentStandardsHandler(ContentStandardsHandler): async def handle_create_content_standards( self, request: CreateContentStandardsRequest, context: ToolContext | None = None ) -> CreateContentStandardsResponse: # Your implementation return CreateContentStandardsResponse(…)

Ancestors

Methods

async def calibrate_content(self,
params: CalibrateContentRequest | dict[str, Any],
context: ToolContext | None = None) ‑> CalibrateContentResponse1 | CalibrateContentResponse2 | NotImplementedResponse
Expand source code
async def calibrate_content(
    self,
    params: CalibrateContentRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> CalibrateContentResponse | NotImplementedResponse:
    """Calibrate content against standards.

    Validates params and delegates to handle_calibrate_content.
    """
    try:
        request = CalibrateContentRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_calibrate_content(request, context)

Calibrate content against standards.

Validates params and delegates to handle_calibrate_content.

async def create_content_standards(self,
params: CreateContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> CreateContentStandardsResponse1 | CreateContentStandardsResponse2 | NotImplementedResponse
Expand source code
async def create_content_standards(
    self,
    params: CreateContentStandardsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> CreateContentStandardsResponse | NotImplementedResponse:
    """Create content standards configuration.

    Validates params and delegates to handle_create_content_standards.
    """
    try:
        request = CreateContentStandardsRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_create_content_standards(request, context)

Create content standards configuration.

Validates params and delegates to handle_create_content_standards.

async def get_content_standards(self,
params: GetContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetContentStandardsResponse1 | GetContentStandardsResponse2 | NotImplementedResponse
Expand source code
async def get_content_standards(
    self,
    params: GetContentStandardsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> GetContentStandardsResponse | NotImplementedResponse:
    """Get content standards configuration.

    Validates params and delegates to handle_get_content_standards.
    """
    try:
        request = GetContentStandardsRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_get_content_standards(request, context)

Get content standards configuration.

Validates params and delegates to handle_get_content_standards.

async def get_media_buy_artifacts(self,
params: GetMediaBuyArtifactsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetMediaBuyArtifactsResponse1 | GetMediaBuyArtifactsResponse2 | NotImplementedResponse
Expand source code
async def get_media_buy_artifacts(
    self,
    params: GetMediaBuyArtifactsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> GetMediaBuyArtifactsResponse | NotImplementedResponse:
    """Get artifacts associated with a media buy.

    Validates params and delegates to handle_get_media_buy_artifacts.
    """
    try:
        request = GetMediaBuyArtifactsRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_get_media_buy_artifacts(request, context)

Get artifacts associated with a media buy.

Validates params and delegates to handle_get_media_buy_artifacts.

async def handle_calibrate_content(self,
request: CalibrateContentRequest,
context: ToolContext | None = None) ‑> CalibrateContentResponse1 | CalibrateContentResponse2
Expand source code
@abstractmethod
async def handle_calibrate_content(
    self,
    request: CalibrateContentRequest,
    context: ToolContext | None = None,
) -> CalibrateContentResponse:
    """Handle calibrate content request."""
    ...

Handle calibrate content request.

async def handle_create_content_standards(self,
request: CreateContentStandardsRequest,
context: ToolContext | None = None) ‑> CreateContentStandardsResponse1 | CreateContentStandardsResponse2
Expand source code
@abstractmethod
async def handle_create_content_standards(
    self,
    request: CreateContentStandardsRequest,
    context: ToolContext | None = None,
) -> CreateContentStandardsResponse:
    """Handle create content standards request."""
    ...

Handle create content standards request.

async def handle_get_content_standards(self,
request: GetContentStandardsRequest,
context: ToolContext | None = None) ‑> GetContentStandardsResponse1 | GetContentStandardsResponse2
Expand source code
@abstractmethod
async def handle_get_content_standards(
    self,
    request: GetContentStandardsRequest,
    context: ToolContext | None = None,
) -> GetContentStandardsResponse:
    """Handle get content standards request."""
    ...

Handle get content standards request.

async def handle_get_media_buy_artifacts(self,
request: GetMediaBuyArtifactsRequest,
context: ToolContext | None = None) ‑> GetMediaBuyArtifactsResponse1 | GetMediaBuyArtifactsResponse2
Expand source code
@abstractmethod
async def handle_get_media_buy_artifacts(
    self,
    request: GetMediaBuyArtifactsRequest,
    context: ToolContext | None = None,
) -> GetMediaBuyArtifactsResponse:
    """Handle get media buy artifacts request."""
    ...

Handle get media buy artifacts request.

async def handle_list_content_standards(self,
request: ListContentStandardsRequest,
context: ToolContext | None = None) ‑> ListContentStandardsResponse1 | ListContentStandardsResponse2
Expand source code
@abstractmethod
async def handle_list_content_standards(
    self,
    request: ListContentStandardsRequest,
    context: ToolContext | None = None,
) -> ListContentStandardsResponse:
    """Handle list content standards request."""
    ...

Handle list content standards request.

async def handle_update_content_standards(self,
request: UpdateContentStandardsRequest,
context: ToolContext | None = None) ‑> UpdateContentStandardsResponse1 | UpdateContentStandardsResponse2
Expand source code
@abstractmethod
async def handle_update_content_standards(
    self,
    request: UpdateContentStandardsRequest,
    context: ToolContext | None = None,
) -> UpdateContentStandardsResponse:
    """Handle update content standards request."""
    ...

Handle update content standards request.

async def handle_validate_content_delivery(self,
request: ValidateContentDeliveryRequest,
context: ToolContext | None = None) ‑> ValidateContentDeliveryResponse1 | ValidateContentDeliveryResponse2
Expand source code
@abstractmethod
async def handle_validate_content_delivery(
    self,
    request: ValidateContentDeliveryRequest,
    context: ToolContext | None = None,
) -> ValidateContentDeliveryResponse:
    """Handle validate content delivery request."""
    ...

Handle validate content delivery request.

async def list_content_standards(self,
params: ListContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> ListContentStandardsResponse1 | ListContentStandardsResponse2 | NotImplementedResponse
Expand source code
async def list_content_standards(
    self,
    params: ListContentStandardsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> ListContentStandardsResponse | NotImplementedResponse:
    """List content standards configurations.

    Validates params and delegates to handle_list_content_standards.
    """
    try:
        request = ListContentStandardsRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_list_content_standards(request, context)

List content standards configurations.

Validates params and delegates to handle_list_content_standards.

async def update_content_standards(self,
params: UpdateContentStandardsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> UpdateContentStandardsResponse1 | UpdateContentStandardsResponse2 | NotImplementedResponse
Expand source code
async def update_content_standards(
    self,
    params: UpdateContentStandardsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> UpdateContentStandardsResponse | NotImplementedResponse:
    """Update content standards configuration.

    Validates params and delegates to handle_update_content_standards.
    """
    try:
        request = UpdateContentStandardsRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_update_content_standards(request, context)

Update content standards configuration.

Validates params and delegates to handle_update_content_standards.

async def validate_content_delivery(self,
params: ValidateContentDeliveryRequest | dict[str, Any],
context: ToolContext | None = None) ‑> ValidateContentDeliveryResponse1 | ValidateContentDeliveryResponse2 | NotImplementedResponse
Expand source code
async def validate_content_delivery(
    self,
    params: ValidateContentDeliveryRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> ValidateContentDeliveryResponse | NotImplementedResponse:
    """Validate content delivery against standards.

    Validates params and delegates to handle_validate_content_delivery.
    """
    try:
        request = ValidateContentDeliveryRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_validate_content_delivery(request, context)

Validate content delivery against standards.

Validates params and delegates to handle_validate_content_delivery.

Inherited members

class GovernanceHandler
Expand source code
class GovernanceHandler(ADCPHandler):
    """Handler for Governance protocol (Property Lists).

    Subclass this to implement a Governance agent that manages property lists
    for brand safety, compliance scoring, and quality filtering.

    All property list operations must be implemented via the handle_* methods.
    The public methods (create_property_list, etc.) handle validation and
    error handling automatically.

    Non-governance operations (get_products, create_media_buy, etc.)
    return 'not supported' via the base class.

    Example:
        class MyGovernanceHandler(GovernanceHandler):
            async def handle_create_property_list(
                self,
                request: CreatePropertyListRequest,
                context: ToolContext | None = None
            ) -> CreatePropertyListResponse:
                # Store the list definition
                list_id = generate_id()
                # ...
                return CreatePropertyListResponse(list=PropertyList(...))
    """

    _agent_type: str = "Governance agents"

    # ========================================================================
    # Governance Operations - Override base class with validation
    # ========================================================================

    async def get_creative_features(
        self,
        params: GetCreativeFeaturesRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> GetCreativeFeaturesResponse | NotImplementedResponse:
        """Evaluate governance features for a creative manifest."""
        try:
            request = GetCreativeFeaturesRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_get_creative_features(request, context)

    async def sync_plans(
        self,
        params: SyncPlansRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> SyncPlansResponse | NotImplementedResponse:
        """Sync campaign governance plans to the agent."""
        try:
            request = SyncPlansRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_sync_plans(request, context)

    async def check_governance(
        self,
        params: CheckGovernanceRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> CheckGovernanceResponse | NotImplementedResponse:
        """Check whether a proposed or committed action complies with plan governance."""
        try:
            request = CheckGovernanceRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_check_governance(request, context)

    async def report_plan_outcome(
        self,
        params: ReportPlanOutcomeRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> ReportPlanOutcomeResponse | NotImplementedResponse:
        """Report the outcome of a previously governed action."""
        try:
            request = ReportPlanOutcomeRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_report_plan_outcome(request, context)

    async def get_plan_audit_logs(
        self,
        params: GetPlanAuditLogsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> GetPlanAuditLogsResponse | NotImplementedResponse:
        """Retrieve governance audit logs for one or more plans."""
        try:
            request = GetPlanAuditLogsRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_get_plan_audit_logs(request, context)

    async def create_property_list(
        self,
        params: CreatePropertyListRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> CreatePropertyListResponse | NotImplementedResponse:
        """Create a property list for governance filtering.

        Validates params and delegates to handle_create_property_list.
        """
        try:
            request = CreatePropertyListRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_create_property_list(request, context)

    async def get_property_list(
        self,
        params: GetPropertyListRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> GetPropertyListResponse | NotImplementedResponse:
        """Get a property list with optional resolution.

        Validates params and delegates to handle_get_property_list.
        """
        try:
            request = GetPropertyListRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_get_property_list(request, context)

    async def list_property_lists(
        self,
        params: ListPropertyListsRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> ListPropertyListsResponse | NotImplementedResponse:
        """List property lists.

        Validates params and delegates to handle_list_property_lists.
        """
        try:
            request = ListPropertyListsRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_list_property_lists(request, context)

    async def update_property_list(
        self,
        params: UpdatePropertyListRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> UpdatePropertyListResponse | NotImplementedResponse:
        """Update a property list.

        Validates params and delegates to handle_update_property_list.
        """
        try:
            request = UpdatePropertyListRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_update_property_list(request, context)

    async def delete_property_list(
        self,
        params: DeletePropertyListRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> DeletePropertyListResponse | NotImplementedResponse:
        """Delete a property list.

        Validates params and delegates to handle_delete_property_list.
        """
        try:
            request = DeletePropertyListRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_delete_property_list(request, context)

    # ========================================================================
    # Abstract handlers - Implement these in subclasses
    # ========================================================================

    @abstractmethod
    async def handle_get_creative_features(
        self,
        request: GetCreativeFeaturesRequest,
        context: ToolContext | None = None,
    ) -> GetCreativeFeaturesResponse:
        """Handle creative feature evaluation."""
        ...

    @abstractmethod
    async def handle_sync_plans(
        self,
        request: SyncPlansRequest,
        context: ToolContext | None = None,
    ) -> SyncPlansResponse:
        """Handle campaign governance plan sync."""
        ...

    @abstractmethod
    async def handle_check_governance(
        self,
        request: CheckGovernanceRequest,
        context: ToolContext | None = None,
    ) -> CheckGovernanceResponse:
        """Handle a governance check request."""
        ...

    @abstractmethod
    async def handle_report_plan_outcome(
        self,
        request: ReportPlanOutcomeRequest,
        context: ToolContext | None = None,
    ) -> ReportPlanOutcomeResponse:
        """Handle reporting of a governed action outcome."""
        ...

    @abstractmethod
    async def handle_get_plan_audit_logs(
        self,
        request: GetPlanAuditLogsRequest,
        context: ToolContext | None = None,
    ) -> GetPlanAuditLogsResponse:
        """Handle retrieval of governance audit logs."""
        ...

    @abstractmethod
    async def handle_create_property_list(
        self,
        request: CreatePropertyListRequest,
        context: ToolContext | None = None,
    ) -> CreatePropertyListResponse:
        """Handle create property list request."""
        ...

    @abstractmethod
    async def handle_get_property_list(
        self,
        request: GetPropertyListRequest,
        context: ToolContext | None = None,
    ) -> GetPropertyListResponse:
        """Handle get property list request."""
        ...

    @abstractmethod
    async def handle_list_property_lists(
        self,
        request: ListPropertyListsRequest,
        context: ToolContext | None = None,
    ) -> ListPropertyListsResponse:
        """Handle list property lists request."""
        ...

    @abstractmethod
    async def handle_update_property_list(
        self,
        request: UpdatePropertyListRequest,
        context: ToolContext | None = None,
    ) -> UpdatePropertyListResponse:
        """Handle update property list request."""
        ...

    @abstractmethod
    async def handle_delete_property_list(
        self,
        request: DeletePropertyListRequest,
        context: ToolContext | None = None,
    ) -> DeletePropertyListResponse:
        """Handle delete property list request."""
        ...

Handler for Governance protocol (Property Lists).

Subclass this to implement a Governance agent that manages property lists for brand safety, compliance scoring, and quality filtering.

All property list operations must be implemented via the handle_* methods. The public methods (create_property_list, etc.) handle validation and error handling automatically.

Non-governance operations (get_products, create_media_buy, etc.) return 'not supported' via the base class.

Example

class MyGovernanceHandler(GovernanceHandler): async def handle_create_property_list( self, request: CreatePropertyListRequest, context: ToolContext | None = None ) -> CreatePropertyListResponse: # Store the list definition list_id = generate_id() # … return CreatePropertyListResponse(list=PropertyList(…))

Ancestors

Methods

async def check_governance(self,
params: CheckGovernanceRequest | dict[str, Any],
context: ToolContext | None = None) ‑> CheckGovernanceResponse | NotImplementedResponse
Expand source code
async def check_governance(
    self,
    params: CheckGovernanceRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> CheckGovernanceResponse | NotImplementedResponse:
    """Check whether a proposed or committed action complies with plan governance."""
    try:
        request = CheckGovernanceRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_check_governance(request, context)

Check whether a proposed or committed action complies with plan governance.

async def create_property_list(self,
params: CreatePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> CreatePropertyListResponse | NotImplementedResponse
Expand source code
async def create_property_list(
    self,
    params: CreatePropertyListRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> CreatePropertyListResponse | NotImplementedResponse:
    """Create a property list for governance filtering.

    Validates params and delegates to handle_create_property_list.
    """
    try:
        request = CreatePropertyListRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_create_property_list(request, context)

Create a property list for governance filtering.

Validates params and delegates to handle_create_property_list.

async def delete_property_list(self,
params: DeletePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> DeletePropertyListResponse | NotImplementedResponse
Expand source code
async def delete_property_list(
    self,
    params: DeletePropertyListRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> DeletePropertyListResponse | NotImplementedResponse:
    """Delete a property list.

    Validates params and delegates to handle_delete_property_list.
    """
    try:
        request = DeletePropertyListRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_delete_property_list(request, context)

Delete a property list.

Validates params and delegates to handle_delete_property_list.

async def get_creative_features(self,
params: GetCreativeFeaturesRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetCreativeFeaturesResponse1 | GetCreativeFeaturesResponse2 | NotImplementedResponse
Expand source code
async def get_creative_features(
    self,
    params: GetCreativeFeaturesRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> GetCreativeFeaturesResponse | NotImplementedResponse:
    """Evaluate governance features for a creative manifest."""
    try:
        request = GetCreativeFeaturesRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_get_creative_features(request, context)

Evaluate governance features for a creative manifest.

async def get_plan_audit_logs(self,
params: GetPlanAuditLogsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetPlanAuditLogsResponse | NotImplementedResponse
Expand source code
async def get_plan_audit_logs(
    self,
    params: GetPlanAuditLogsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> GetPlanAuditLogsResponse | NotImplementedResponse:
    """Retrieve governance audit logs for one or more plans."""
    try:
        request = GetPlanAuditLogsRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_get_plan_audit_logs(request, context)

Retrieve governance audit logs for one or more plans.

async def get_property_list(self,
params: GetPropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> GetPropertyListResponse | NotImplementedResponse
Expand source code
async def get_property_list(
    self,
    params: GetPropertyListRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> GetPropertyListResponse | NotImplementedResponse:
    """Get a property list with optional resolution.

    Validates params and delegates to handle_get_property_list.
    """
    try:
        request = GetPropertyListRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_get_property_list(request, context)

Get a property list with optional resolution.

Validates params and delegates to handle_get_property_list.

async def handle_check_governance(self,
request: CheckGovernanceRequest,
context: ToolContext | None = None) ‑> CheckGovernanceResponse
Expand source code
@abstractmethod
async def handle_check_governance(
    self,
    request: CheckGovernanceRequest,
    context: ToolContext | None = None,
) -> CheckGovernanceResponse:
    """Handle a governance check request."""
    ...

Handle a governance check request.

async def handle_create_property_list(self,
request: CreatePropertyListRequest,
context: ToolContext | None = None) ‑> CreatePropertyListResponse
Expand source code
@abstractmethod
async def handle_create_property_list(
    self,
    request: CreatePropertyListRequest,
    context: ToolContext | None = None,
) -> CreatePropertyListResponse:
    """Handle create property list request."""
    ...

Handle create property list request.

async def handle_delete_property_list(self,
request: DeletePropertyListRequest,
context: ToolContext | None = None) ‑> DeletePropertyListResponse
Expand source code
@abstractmethod
async def handle_delete_property_list(
    self,
    request: DeletePropertyListRequest,
    context: ToolContext | None = None,
) -> DeletePropertyListResponse:
    """Handle delete property list request."""
    ...

Handle delete property list request.

async def handle_get_creative_features(self,
request: GetCreativeFeaturesRequest,
context: ToolContext | None = None) ‑> GetCreativeFeaturesResponse1 | GetCreativeFeaturesResponse2
Expand source code
@abstractmethod
async def handle_get_creative_features(
    self,
    request: GetCreativeFeaturesRequest,
    context: ToolContext | None = None,
) -> GetCreativeFeaturesResponse:
    """Handle creative feature evaluation."""
    ...

Handle creative feature evaluation.

async def handle_get_plan_audit_logs(self,
request: GetPlanAuditLogsRequest,
context: ToolContext | None = None) ‑> GetPlanAuditLogsResponse
Expand source code
@abstractmethod
async def handle_get_plan_audit_logs(
    self,
    request: GetPlanAuditLogsRequest,
    context: ToolContext | None = None,
) -> GetPlanAuditLogsResponse:
    """Handle retrieval of governance audit logs."""
    ...

Handle retrieval of governance audit logs.

async def handle_get_property_list(self,
request: GetPropertyListRequest,
context: ToolContext | None = None) ‑> GetPropertyListResponse
Expand source code
@abstractmethod
async def handle_get_property_list(
    self,
    request: GetPropertyListRequest,
    context: ToolContext | None = None,
) -> GetPropertyListResponse:
    """Handle get property list request."""
    ...

Handle get property list request.

async def handle_list_property_lists(self,
request: ListPropertyListsRequest,
context: ToolContext | None = None) ‑> ListPropertyListsResponse
Expand source code
@abstractmethod
async def handle_list_property_lists(
    self,
    request: ListPropertyListsRequest,
    context: ToolContext | None = None,
) -> ListPropertyListsResponse:
    """Handle list property lists request."""
    ...

Handle list property lists request.

async def handle_report_plan_outcome(self,
request: ReportPlanOutcomeRequest,
context: ToolContext | None = None) ‑> ReportPlanOutcomeResponse
Expand source code
@abstractmethod
async def handle_report_plan_outcome(
    self,
    request: ReportPlanOutcomeRequest,
    context: ToolContext | None = None,
) -> ReportPlanOutcomeResponse:
    """Handle reporting of a governed action outcome."""
    ...

Handle reporting of a governed action outcome.

async def handle_sync_plans(self,
request: SyncPlansRequest,
context: ToolContext | None = None) ‑> SyncPlansResponse
Expand source code
@abstractmethod
async def handle_sync_plans(
    self,
    request: SyncPlansRequest,
    context: ToolContext | None = None,
) -> SyncPlansResponse:
    """Handle campaign governance plan sync."""
    ...

Handle campaign governance plan sync.

async def handle_update_property_list(self,
request: UpdatePropertyListRequest,
context: ToolContext | None = None) ‑> UpdatePropertyListResponse
Expand source code
@abstractmethod
async def handle_update_property_list(
    self,
    request: UpdatePropertyListRequest,
    context: ToolContext | None = None,
) -> UpdatePropertyListResponse:
    """Handle update property list request."""
    ...

Handle update property list request.

async def list_property_lists(self,
params: ListPropertyListsRequest | dict[str, Any],
context: ToolContext | None = None) ‑> ListPropertyListsResponse | NotImplementedResponse
Expand source code
async def list_property_lists(
    self,
    params: ListPropertyListsRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> ListPropertyListsResponse | NotImplementedResponse:
    """List property lists.

    Validates params and delegates to handle_list_property_lists.
    """
    try:
        request = ListPropertyListsRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_list_property_lists(request, context)

List property lists.

Validates params and delegates to handle_list_property_lists.

async def report_plan_outcome(self,
params: ReportPlanOutcomeRequest | dict[str, Any],
context: ToolContext | None = None) ‑> ReportPlanOutcomeResponse | NotImplementedResponse
Expand source code
async def report_plan_outcome(
    self,
    params: ReportPlanOutcomeRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> ReportPlanOutcomeResponse | NotImplementedResponse:
    """Report the outcome of a previously governed action."""
    try:
        request = ReportPlanOutcomeRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_report_plan_outcome(request, context)

Report the outcome of a previously governed action.

async def sync_plans(self,
params: SyncPlansRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SyncPlansResponse | NotImplementedResponse
Expand source code
async def sync_plans(
    self,
    params: SyncPlansRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> SyncPlansResponse | NotImplementedResponse:
    """Sync campaign governance plans to the agent."""
    try:
        request = SyncPlansRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_sync_plans(request, context)

Sync campaign governance plans to the agent.

async def update_property_list(self,
params: UpdatePropertyListRequest | dict[str, Any],
context: ToolContext | None = None) ‑> UpdatePropertyListResponse | NotImplementedResponse
Expand source code
async def update_property_list(
    self,
    params: UpdatePropertyListRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> UpdatePropertyListResponse | NotImplementedResponse:
    """Update a property list.

    Validates params and delegates to handle_update_property_list.
    """
    try:
        request = UpdatePropertyListRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_update_property_list(request, context)

Update a property list.

Validates params and delegates to handle_update_property_list.

Inherited members

class IdempotencyStore (backend: IdempotencyBackend,
ttl_seconds: int = 86400,
hash_fn: Callable[[dict[str, Any]], str] = <function canonical_json_sha256>,
*,
clock: Callable[[], float] = <built-in function time>)
Expand source code
class IdempotencyStore:
    """Coordinator that binds canonical hashing to a storage backend.

    :param backend: A concrete :class:`IdempotencyBackend`.
    :param ttl_seconds: How long cached responses remain replayable. Must be
        within the spec's ``[3600, 604800]`` range (1h to 7d). 86400 (24h) is
        the recommended floor and matches the compliance storyboard.
    :param hash_fn: Optional override for the canonical hash function. Defaults
        to :func:`canonical_json_sha256`. Exposed for tests and for anyone who
        wants to experiment with alternative equivalence rules — though note
        the spec mandates RFC 8785 JCS for interop.
    """

    def __init__(
        self,
        backend: IdempotencyBackend,
        ttl_seconds: int = 86400,
        hash_fn: Callable[[dict[str, Any]], str] = canonical_json_sha256,
        *,
        clock: Callable[[], float] = time.time,
    ) -> None:
        if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS:
            raise ValueError(
                f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] "
                f"per AdCP spec (capabilities.idempotency.replay_ttl_seconds), "
                f"got {ttl_seconds}"
            )
        self.backend = backend
        self.ttl_seconds = ttl_seconds
        self._hash_fn = hash_fn
        self._clock = clock

    def capability(self) -> dict[str, Any]:
        """Return the capabilities fragment declaring this store's replay window.

        Embed under ``capabilities.adcp.idempotency`` on the seller's
        ``get_adcp_capabilities`` response. Buyers read this to reason about
        retry-safe windows (AdCP #2315)::

            caps.adcp.idempotency = idempotency.capability()
            # → {"supported": True, "replay_ttl_seconds": 86400}

        ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only
        ``replay_ttl_seconds`` fail strict schema validation on the new
        capabilities response.
        """
        return {"supported": True, "replay_ttl_seconds": self.ttl_seconds}

    def wrap(self, handler: HandlerFn) -> HandlerFn:
        """Decorator that adds idempotency semantics to an AdCP handler method.

        The wrapped handler is called as ``handler(self, params, context)``.
        ``params`` may be a dict or a Pydantic model — both are normalized to
        a dict before hashing. The return value is coerced to a dict for
        caching (via ``model_dump`` if Pydantic).

        The decorator always returns the handler's original object on a cache
        miss and a best-effort Pydantic re-validation on a hit (when the
        handler's declared return type exposes ``model_validate``). Callers
        that return raw dicts get dicts back.
        """

        @wraps(handler)
        async def _wrapped(
            handler_self: Any,
            params: Any,
            context: Any = None,
            *args: Any,
            **kwargs: Any,
        ) -> Any:
            principal_id, idempotency_key, params_dict = self._prepare(params, context)
            if principal_id is None or idempotency_key is None:
                # No key → spec says the server MUST reject with INVALID_REQUEST.
                # We let the handler run so validation layers above us (Pydantic,
                # FastAPI, etc.) can reject with a typed error; the middleware's
                # job is only to dedup when a key IS present.
                return await handler(handler_self, params, context, *args, **kwargs)

            payload_hash = self._hash_fn(params_dict)

            cached = await self.backend.get(principal_id, idempotency_key)
            if cached is not None:
                if cached.payload_hash == payload_hash:
                    logger.debug(
                        "idempotency replay: principal=%s key_prefix=%s",
                        principal_id,
                        idempotency_key[:8],
                    )
                    return _clone_response(cached.response)
                # Same key, different payload — spec-defined conflict.
                raise IdempotencyConflictError(
                    operation=getattr(handler, "__name__", "handler"),
                    errors=[
                        {
                            "code": "IDEMPOTENCY_CONFLICT",
                            "message": (
                                "idempotency_key reused with a different payload "
                                "(canonical hash mismatch)"
                            ),
                        }
                    ],
                )

            response = await handler(handler_self, params, context, *args, **kwargs)
            # Deep-copy when caching so post-return mutation of the caller's
            # copy can't poison future replays. `_clone_response` also deep-
            # copies on the hit path, giving independent objects per replay.
            response_dict = copy.deepcopy(_to_dict(response))
            entry = CachedResponse(
                payload_hash=payload_hash,
                response=response_dict,
                expires_at_epoch=self._clock() + self.ttl_seconds,
            )
            # Commit cache AFTER handler returns. Atomicity with the handler's
            # side effects depends on the backend: MemoryBackend is best-effort
            # (no transactional relationship to external resources); PgBackend
            # (follow-up) will commit in the same transaction when the handler
            # uses the same engine. On put failure we log loudly and return
            # the handler's response — swallowing the exception would be wrong
            # (operators need the signal that caching is broken), and raising
            # would look to the caller like the handler failed, triggering a
            # retry that re-executes side effects. Best compromise: warn
            # operators, return the result, and accept that the next retry
            # with this key will re-execute.
            try:
                await self.backend.put(principal_id, idempotency_key, entry)
            except Exception:
                logger.warning(
                    "Idempotency cache put failed for principal=%s key_prefix=%s — "
                    "handler completed but a subsequent retry with this key will "
                    "re-execute rather than replay. This indicates an operational "
                    "issue with the idempotency backend.",
                    principal_id,
                    idempotency_key[:8],
                    exc_info=True,
                )
            return response

        return _wrapped

    def _prepare(self, params: Any, context: Any) -> tuple[str | None, str | None, dict[str, Any]]:
        """Normalize inputs and extract the (principal, key, params_dict) tuple.

        Returns ``(None, None, params_dict)`` when idempotency doesn't apply
        (no caller identity or no key supplied). The caller falls through to
        the plain handler in that case — validation of missing-key lives in
        the request schema, not here.
        """
        params_dict = _to_dict(params)
        idempotency_key = params_dict.get("idempotency_key")
        if not isinstance(idempotency_key, str) or not idempotency_key:
            return None, None, params_dict
        principal_id = _extract_principal_id(context)
        if principal_id is None:
            # No caller identity: we can't safely scope the key. Spec requires
            # per-principal scope; anything else is a cross-principal replay
            # attack surface. Fall through to the handler (which will process
            # the request normally — no dedup, but no security regression).
            self._warn_missing_principal_once()
            return None, None, params_dict
        return principal_id, idempotency_key, params_dict

    _missing_principal_warned: bool = False

    def _warn_missing_principal_once(self) -> None:
        """Emit a one-time warning when the middleware sees a key but no principal.

        Silent fall-through is the worst DX: the seller drops in
        ``@idempotency.wrap``, ships, and doesn't discover until incident
        review that no dedup ever happened. Fire once per store instance so
        operators see the signal without filling logs on every request.
        """
        if self._missing_principal_warned:
            return
        self._missing_principal_warned = True
        warnings.warn(
            "IdempotencyStore received a request with idempotency_key but no "
            "caller_identity on ToolContext — dedup is SKIPPED. This usually "
            "means your transport isn't populating the authenticated principal. "
            "A2A: wire an a2a-sdk auth middleware that sets ServerCallContext.user; "
            "MCP: populate ToolContext.caller_identity from your FastMCP auth "
            "middleware (see adcp.server.idempotency README). "
            "This warning fires once per IdempotencyStore instance.",
            UserWarning,
            stacklevel=3,
        )

Coordinator that binds canonical hashing to a storage backend.

:param backend: A concrete :class:IdempotencyBackend. :param ttl_seconds: How long cached responses remain replayable. Must be within the spec's [3600, 604800] range (1h to 7d). 86400 (24h) is the recommended floor and matches the compliance storyboard. :param hash_fn: Optional override for the canonical hash function. Defaults to :func:canonical_json_sha256. Exposed for tests and for anyone who wants to experiment with alternative equivalence rules — though note the spec mandates RFC 8785 JCS for interop.

Methods

def capability(self) ‑> dict[str, typing.Any]
Expand source code
def capability(self) -> dict[str, Any]:
    """Return the capabilities fragment declaring this store's replay window.

    Embed under ``capabilities.adcp.idempotency`` on the seller's
    ``get_adcp_capabilities`` response. Buyers read this to reason about
    retry-safe windows (AdCP #2315)::

        caps.adcp.idempotency = idempotency.capability()
        # → {"supported": True, "replay_ttl_seconds": 86400}

    ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only
    ``replay_ttl_seconds`` fail strict schema validation on the new
    capabilities response.
    """
    return {"supported": True, "replay_ttl_seconds": self.ttl_seconds}

Return the capabilities fragment declaring this store's replay window.

Embed under capabilities.adcp.idempotency on the seller's get_adcp_capabilities response. Buyers read this to reason about retry-safe windows (AdCP #2315)::

caps.adcp.idempotency = idempotency.capability()
# → {"supported": True, "replay_ttl_seconds": 86400}

supported became REQUIRED in AdCP 3.0 GA — agents emitting only replay_ttl_seconds fail strict schema validation on the new capabilities response.

def wrap(self, handler: HandlerFn) ‑> Callable[..., Awaitable[typing.Any]]
Expand source code
def wrap(self, handler: HandlerFn) -> HandlerFn:
    """Decorator that adds idempotency semantics to an AdCP handler method.

    The wrapped handler is called as ``handler(self, params, context)``.
    ``params`` may be a dict or a Pydantic model — both are normalized to
    a dict before hashing. The return value is coerced to a dict for
    caching (via ``model_dump`` if Pydantic).

    The decorator always returns the handler's original object on a cache
    miss and a best-effort Pydantic re-validation on a hit (when the
    handler's declared return type exposes ``model_validate``). Callers
    that return raw dicts get dicts back.
    """

    @wraps(handler)
    async def _wrapped(
        handler_self: Any,
        params: Any,
        context: Any = None,
        *args: Any,
        **kwargs: Any,
    ) -> Any:
        principal_id, idempotency_key, params_dict = self._prepare(params, context)
        if principal_id is None or idempotency_key is None:
            # No key → spec says the server MUST reject with INVALID_REQUEST.
            # We let the handler run so validation layers above us (Pydantic,
            # FastAPI, etc.) can reject with a typed error; the middleware's
            # job is only to dedup when a key IS present.
            return await handler(handler_self, params, context, *args, **kwargs)

        payload_hash = self._hash_fn(params_dict)

        cached = await self.backend.get(principal_id, idempotency_key)
        if cached is not None:
            if cached.payload_hash == payload_hash:
                logger.debug(
                    "idempotency replay: principal=%s key_prefix=%s",
                    principal_id,
                    idempotency_key[:8],
                )
                return _clone_response(cached.response)
            # Same key, different payload — spec-defined conflict.
            raise IdempotencyConflictError(
                operation=getattr(handler, "__name__", "handler"),
                errors=[
                    {
                        "code": "IDEMPOTENCY_CONFLICT",
                        "message": (
                            "idempotency_key reused with a different payload "
                            "(canonical hash mismatch)"
                        ),
                    }
                ],
            )

        response = await handler(handler_self, params, context, *args, **kwargs)
        # Deep-copy when caching so post-return mutation of the caller's
        # copy can't poison future replays. `_clone_response` also deep-
        # copies on the hit path, giving independent objects per replay.
        response_dict = copy.deepcopy(_to_dict(response))
        entry = CachedResponse(
            payload_hash=payload_hash,
            response=response_dict,
            expires_at_epoch=self._clock() + self.ttl_seconds,
        )
        # Commit cache AFTER handler returns. Atomicity with the handler's
        # side effects depends on the backend: MemoryBackend is best-effort
        # (no transactional relationship to external resources); PgBackend
        # (follow-up) will commit in the same transaction when the handler
        # uses the same engine. On put failure we log loudly and return
        # the handler's response — swallowing the exception would be wrong
        # (operators need the signal that caching is broken), and raising
        # would look to the caller like the handler failed, triggering a
        # retry that re-executes side effects. Best compromise: warn
        # operators, return the result, and accept that the next retry
        # with this key will re-execute.
        try:
            await self.backend.put(principal_id, idempotency_key, entry)
        except Exception:
            logger.warning(
                "Idempotency cache put failed for principal=%s key_prefix=%s — "
                "handler completed but a subsequent retry with this key will "
                "re-execute rather than replay. This indicates an operational "
                "issue with the idempotency backend.",
                principal_id,
                idempotency_key[:8],
                exc_info=True,
            )
        return response

    return _wrapped

Decorator that adds idempotency semantics to an AdCP handler method.

The wrapped handler is called as handler(self, params, context). params may be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (via model_dump if Pydantic).

The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes model_validate). Callers that return raw dicts get dicts back.

class MCPToolSet (handler: ADCPHandler)
Expand source code
class MCPToolSet:
    """Collection of MCP tools from an ADCP handler.

    Provides tool definitions and handlers for registering with an MCP server.
    """

    def __init__(self, handler: ADCPHandler):
        """Create tool set from handler.

        Args:
            handler: ADCP handler instance
        """
        self.handler = handler
        self._filtered_definitions = get_tools_for_handler(handler)
        self._tools: dict[str, Callable[..., Any]] = {}

        # Create tool callers only for filtered tools
        for tool_def in self._filtered_definitions:
            name = tool_def["name"]
            self._tools[name] = create_tool_caller(handler, name)

    @property
    def tool_definitions(self) -> list[dict[str, Any]]:
        """Get MCP tool definitions filtered by handler type."""
        return list(self._filtered_definitions)

    async def call_tool(self, name: str, params: dict[str, Any]) -> Any:
        """Call a tool by name.

        Args:
            name: Tool name
            params: Tool parameters

        Returns:
            Tool result

        Raises:
            KeyError: If tool not found
        """
        if name not in self._tools:
            raise KeyError(f"Unknown tool: {name}")
        return await self._tools[name](params)

    def get_tool_names(self) -> list[str]:
        """Get list of available tool names."""
        return list(self._tools.keys())

Collection of MCP tools from an ADCP handler.

Provides tool definitions and handlers for registering with an MCP server.

Create tool set from handler.

Args

handler
ADCP handler instance

Instance variables

prop tool_definitions : list[dict[str, Any]]
Expand source code
@property
def tool_definitions(self) -> list[dict[str, Any]]:
    """Get MCP tool definitions filtered by handler type."""
    return list(self._filtered_definitions)

Get MCP tool definitions filtered by handler type.

Methods

async def call_tool(self, name: str, params: dict[str, Any]) ‑> Any
Expand source code
async def call_tool(self, name: str, params: dict[str, Any]) -> Any:
    """Call a tool by name.

    Args:
        name: Tool name
        params: Tool parameters

    Returns:
        Tool result

    Raises:
        KeyError: If tool not found
    """
    if name not in self._tools:
        raise KeyError(f"Unknown tool: {name}")
    return await self._tools[name](params)

Call a tool by name.

Args

name
Tool name
params
Tool parameters

Returns

Tool result

Raises

KeyError
If tool not found
def get_tool_names(self) ‑> list[str]
Expand source code
def get_tool_names(self) -> list[str]:
    """Get list of available tool names."""
    return list(self._tools.keys())

Get list of available tool names.

class MemoryBackend (*, clock: Callable[[], float] = <built-in function time>)
Expand source code
class MemoryBackend(IdempotencyBackend):
    """In-process dict-backed store.

    Suitable for tests, single-process reference implementations, and local
    development. **Not suitable for multi-process deployments** — each worker
    has its own cache, so a retry that lands on a different worker is treated
    as a fresh request.

    Thread safety: the backend uses an :class:`asyncio.Lock` to serialize
    mutations of the shared dict. Reads go through the lock too; for a pure
    in-process backend this is cheap and prevents torn reads across concurrent
    ``get``/``put`` interleaving.

    :param clock: Callable returning the current epoch seconds. Override for
        tests that need to advance time deterministically without monkeypatching
        :mod:`time`. Defaults to :func:`time.time`.
    """

    def __init__(self, *, clock: Callable[[], float] = time.time) -> None:
        self._store: dict[tuple[str, str], CachedResponse] = {}
        self._lock = asyncio.Lock()
        self._clock = clock

    async def get(
        self, principal_id: str, key: str
    ) -> CachedResponse | None:
        async with self._lock:
            entry = self._store.get((principal_id, key))
            if entry is None:
                return None
            if entry.expires_at_epoch <= self._clock():
                # Lazy expiry — drop the stale entry so the next request
                # treats the slot as fresh and races to repopulate.
                del self._store[(principal_id, key)]
                return None
            return entry

    async def put(
        self,
        principal_id: str,
        key: str,
        entry: CachedResponse,
    ) -> None:
        async with self._lock:
            self._store[(principal_id, key)] = entry

    async def delete_expired(self, now_epoch: float | None = None) -> int:
        cutoff = now_epoch if now_epoch is not None else self._clock()
        async with self._lock:
            stale = [k for k, v in self._store.items() if v.expires_at_epoch <= cutoff]
            for k in stale:
                del self._store[k]
            return len(stale)

    async def clear(self) -> None:
        """Remove all cached entries.

        Test-suite hook — handy for resetting state between fixtures when a
        single :class:`MemoryBackend` is shared across multiple tests.
        """
        async with self._lock:
            self._store.clear()

    async def _size(self) -> int:
        """Test-only: return the current entry count."""
        async with self._lock:
            return len(self._store)

In-process dict-backed store.

Suitable for tests, single-process reference implementations, and local development. Not suitable for multi-process deployments — each worker has its own cache, so a retry that lands on a different worker is treated as a fresh request.

Thread safety: the backend uses an :class:asyncio.Lock to serialize mutations of the shared dict. Reads go through the lock too; for a pure in-process backend this is cheap and prevents torn reads across concurrent get/put interleaving.

:param clock: Callable returning the current epoch seconds. Override for tests that need to advance time deterministically without monkeypatching :mod:time. Defaults to :func:time.time.

Ancestors

Methods

async def clear(self) ‑> None
Expand source code
async def clear(self) -> None:
    """Remove all cached entries.

    Test-suite hook — handy for resetting state between fixtures when a
    single :class:`MemoryBackend` is shared across multiple tests.
    """
    async with self._lock:
        self._store.clear()

Remove all cached entries.

Test-suite hook — handy for resetting state between fixtures when a single :class:MemoryBackend is shared across multiple tests.

Inherited members

class NotImplementedResponse (**data: Any)
Expand source code
class NotImplementedResponse(BaseModel):
    """Standard response for operations not supported by this handler."""

    supported: bool = False
    reason: str = "This operation is not supported by this agent"
    error: Error | None = None

Standard response for operations not supported by this handler.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var errorError | None
var model_config
var reason : str
var supported : bool
class ProposalBuilder (name: str, proposal_id: str | None = None)
Expand source code
class ProposalBuilder:
    """Builder for ADCP Proposals.

    Helps construct valid proposals for get_products responses. Proposals
    represent recommended media plans with budget allocations.

    Example:
        proposal = (
            ProposalBuilder("Q1 Brand Campaign")
            .with_description("Balanced awareness campaign")
            .add_allocation("product-1", 60)
                .with_rationale("High-impact display")
            .add_allocation("product-2", 40)
                .with_rationale("Contextual targeting")
            .with_budget_guidance(min=10000, recommended=25000, max=50000)
            .build()
        )
    """

    def __init__(self, name: str, proposal_id: str | None = None):
        """Create a new proposal builder.

        Args:
            name: Human-readable name for the proposal
            proposal_id: Unique ID (auto-generated if not provided)
        """
        self._name = name
        self._proposal_id = proposal_id or f"proposal-{uuid4().hex[:8]}"
        self._description: str | None = None
        self._brief_alignment: str | None = None
        self._expires_at: datetime | None = None
        self._allocations: list[dict[str, Any]] = []
        self._budget_guidance: dict[str, Any] | None = None
        self._current_allocation: AllocationBuilder | None = None
        self._ext: dict[str, Any] | None = None

    def with_description(self, description: str) -> ProposalBuilder:
        """Add description explaining the proposal strategy.

        Args:
            description: What the proposal achieves
        """
        self._finalize_allocation()
        self._description = description
        return self

    def with_brief_alignment(self, alignment: str) -> ProposalBuilder:
        """Explain how proposal aligns with campaign brief.

        Args:
            alignment: Alignment explanation
        """
        self._finalize_allocation()
        self._brief_alignment = alignment
        return self

    def expires_in(self, days: int = 7) -> ProposalBuilder:
        """Set expiration relative to now.

        Args:
            days: Number of days until expiration
        """
        self._finalize_allocation()
        self._expires_at = datetime.now(timezone.utc) + timedelta(days=days)
        return self

    def expires_at(self, expires: datetime) -> ProposalBuilder:
        """Set absolute expiration time.

        Args:
            expires: When the proposal expires
        """
        self._finalize_allocation()
        self._expires_at = expires
        return self

    def add_allocation(
        self,
        product_id: str,
        allocation_percentage: float,
    ) -> ProposalBuilder:
        """Add a product allocation.

        After calling this, chain allocation methods (with_rationale, etc.)
        before adding another allocation or calling build().

        Args:
            product_id: ID of the product
            allocation_percentage: Percentage of budget (0-100)

        Returns:
            Self for method chaining
        """
        self._finalize_allocation()
        self._current_allocation = AllocationBuilder(product_id, allocation_percentage)
        return self

    def with_pricing_option(self, pricing_option_id: str) -> ProposalBuilder:
        """Set pricing option for current allocation."""
        if self._current_allocation:
            self._current_allocation.with_pricing_option(pricing_option_id)
        return self

    def with_rationale(self, rationale: str) -> ProposalBuilder:
        """Add rationale for current allocation."""
        if self._current_allocation:
            self._current_allocation.with_rationale(rationale)
        return self

    def with_sequence(self, sequence: int) -> ProposalBuilder:
        """Set sequence for current allocation."""
        if self._current_allocation:
            self._current_allocation.with_sequence(sequence)
        return self

    def with_tags(self, tags: list[str]) -> ProposalBuilder:
        """Add tags for current allocation."""
        if self._current_allocation:
            self._current_allocation.with_tags(tags)
        return self

    def with_budget_guidance(
        self,
        *,
        min: float | None = None,
        recommended: float | None = None,
        max: float | None = None,
        currency: str = "USD",
    ) -> ProposalBuilder:
        """Add budget guidance for the proposal.

        Args:
            min: Minimum recommended budget
            recommended: Optimal budget
            max: Maximum before diminishing returns
            currency: ISO 4217 currency code
        """
        self._finalize_allocation()
        self._budget_guidance = {
            "currency": currency,
        }
        if min is not None:
            self._budget_guidance["min"] = min
        if recommended is not None:
            self._budget_guidance["recommended"] = recommended
        if max is not None:
            self._budget_guidance["max"] = max
        return self

    def with_extension(self, ext: dict[str, Any]) -> ProposalBuilder:
        """Add extension data.

        Args:
            ext: Extension object
        """
        self._finalize_allocation()
        self._ext = ext
        return self

    def _finalize_allocation(self) -> None:
        """Finalize current allocation and add to list."""
        if self._current_allocation:
            self._allocations.append(self._current_allocation.build())
            self._current_allocation = None

    def build(self) -> dict[str, Any]:
        """Build the proposal dict.

        Returns:
            Proposal as a dict ready for use in get_products response

        Raises:
            ValueError: If allocations don't sum to 100
        """
        self._finalize_allocation()

        if not self._allocations:
            raise ValueError("Proposal must have at least one allocation")

        total = sum(a["allocation_percentage"] for a in self._allocations)
        if abs(total - 100.0) > 0.01:
            raise ValueError(f"Allocation percentages must sum to 100, got {total}")

        proposal: dict[str, Any] = {
            "proposal_id": self._proposal_id,
            "name": self._name,
            "allocations": self._allocations,
        }

        if self._description:
            proposal["description"] = self._description
        if self._brief_alignment:
            proposal["brief_alignment"] = self._brief_alignment
        if self._expires_at:
            proposal["expires_at"] = self._expires_at.isoformat()
        if self._budget_guidance:
            proposal["total_budget_guidance"] = self._budget_guidance
        if self._ext:
            proposal["ext"] = self._ext

        return proposal

    def validate(self) -> list[str]:
        """Validate the proposal without building.

        Returns:
            List of validation errors (empty if valid)
        """
        errors: list[str] = []

        if self._current_allocation:
            allocations = self._allocations + [self._current_allocation.build()]
        else:
            allocations = self._allocations

        if not allocations:
            errors.append("Proposal must have at least one allocation")
        else:
            total = sum(a["allocation_percentage"] for a in allocations)
            if abs(total - 100.0) > 0.01:
                errors.append(f"Allocation percentages must sum to 100, got {total}")

        return errors

Builder for ADCP Proposals.

Helps construct valid proposals for get_products responses. Proposals represent recommended media plans with budget allocations.

Example

proposal = ( ProposalBuilder("Q1 Brand Campaign") .with_description("Balanced awareness campaign") .add_allocation("product-1", 60) .with_rationale("High-impact display") .add_allocation("product-2", 40) .with_rationale("Contextual targeting") .with_budget_guidance(min=10000, recommended=25000, max=50000) .build() )

Create a new proposal builder.

Args

name
Human-readable name for the proposal
proposal_id
Unique ID (auto-generated if not provided)

Methods

def add_allocation(self, product_id: str, allocation_percentage: float) ‑> ProposalBuilder
Expand source code
def add_allocation(
    self,
    product_id: str,
    allocation_percentage: float,
) -> ProposalBuilder:
    """Add a product allocation.

    After calling this, chain allocation methods (with_rationale, etc.)
    before adding another allocation or calling build().

    Args:
        product_id: ID of the product
        allocation_percentage: Percentage of budget (0-100)

    Returns:
        Self for method chaining
    """
    self._finalize_allocation()
    self._current_allocation = AllocationBuilder(product_id, allocation_percentage)
    return self

Add a product allocation.

After calling this, chain allocation methods (with_rationale, etc.) before adding another allocation or calling build().

Args

product_id
ID of the product
allocation_percentage
Percentage of budget (0-100)

Returns

Self for method chaining

def build(self) ‑> dict[str, typing.Any]
Expand source code
def build(self) -> dict[str, Any]:
    """Build the proposal dict.

    Returns:
        Proposal as a dict ready for use in get_products response

    Raises:
        ValueError: If allocations don't sum to 100
    """
    self._finalize_allocation()

    if not self._allocations:
        raise ValueError("Proposal must have at least one allocation")

    total = sum(a["allocation_percentage"] for a in self._allocations)
    if abs(total - 100.0) > 0.01:
        raise ValueError(f"Allocation percentages must sum to 100, got {total}")

    proposal: dict[str, Any] = {
        "proposal_id": self._proposal_id,
        "name": self._name,
        "allocations": self._allocations,
    }

    if self._description:
        proposal["description"] = self._description
    if self._brief_alignment:
        proposal["brief_alignment"] = self._brief_alignment
    if self._expires_at:
        proposal["expires_at"] = self._expires_at.isoformat()
    if self._budget_guidance:
        proposal["total_budget_guidance"] = self._budget_guidance
    if self._ext:
        proposal["ext"] = self._ext

    return proposal

Build the proposal dict.

Returns

Proposal as a dict ready for use in get_products response

Raises

ValueError
If allocations don't sum to 100
def expires_at(self, expires: datetime) ‑> ProposalBuilder
Expand source code
def expires_at(self, expires: datetime) -> ProposalBuilder:
    """Set absolute expiration time.

    Args:
        expires: When the proposal expires
    """
    self._finalize_allocation()
    self._expires_at = expires
    return self

Set absolute expiration time.

Args

expires
When the proposal expires
def expires_in(self, days: int = 7) ‑> ProposalBuilder
Expand source code
def expires_in(self, days: int = 7) -> ProposalBuilder:
    """Set expiration relative to now.

    Args:
        days: Number of days until expiration
    """
    self._finalize_allocation()
    self._expires_at = datetime.now(timezone.utc) + timedelta(days=days)
    return self

Set expiration relative to now.

Args

days
Number of days until expiration
def validate(self) ‑> list[str]
Expand source code
def validate(self) -> list[str]:
    """Validate the proposal without building.

    Returns:
        List of validation errors (empty if valid)
    """
    errors: list[str] = []

    if self._current_allocation:
        allocations = self._allocations + [self._current_allocation.build()]
    else:
        allocations = self._allocations

    if not allocations:
        errors.append("Proposal must have at least one allocation")
    else:
        total = sum(a["allocation_percentage"] for a in allocations)
        if abs(total - 100.0) > 0.01:
            errors.append(f"Allocation percentages must sum to 100, got {total}")

    return errors

Validate the proposal without building.

Returns

List of validation errors (empty if valid)

def with_brief_alignment(self, alignment: str) ‑> ProposalBuilder
Expand source code
def with_brief_alignment(self, alignment: str) -> ProposalBuilder:
    """Explain how proposal aligns with campaign brief.

    Args:
        alignment: Alignment explanation
    """
    self._finalize_allocation()
    self._brief_alignment = alignment
    return self

Explain how proposal aligns with campaign brief.

Args

alignment
Alignment explanation
def with_budget_guidance(self,
*,
min: float | None = None,
recommended: float | None = None,
max: float | None = None,
currency: str = 'USD') ‑> ProposalBuilder
Expand source code
def with_budget_guidance(
    self,
    *,
    min: float | None = None,
    recommended: float | None = None,
    max: float | None = None,
    currency: str = "USD",
) -> ProposalBuilder:
    """Add budget guidance for the proposal.

    Args:
        min: Minimum recommended budget
        recommended: Optimal budget
        max: Maximum before diminishing returns
        currency: ISO 4217 currency code
    """
    self._finalize_allocation()
    self._budget_guidance = {
        "currency": currency,
    }
    if min is not None:
        self._budget_guidance["min"] = min
    if recommended is not None:
        self._budget_guidance["recommended"] = recommended
    if max is not None:
        self._budget_guidance["max"] = max
    return self

Add budget guidance for the proposal.

Args

min
Minimum recommended budget
recommended
Optimal budget
max
Maximum before diminishing returns
currency
ISO 4217 currency code
def with_description(self, description: str) ‑> ProposalBuilder
Expand source code
def with_description(self, description: str) -> ProposalBuilder:
    """Add description explaining the proposal strategy.

    Args:
        description: What the proposal achieves
    """
    self._finalize_allocation()
    self._description = description
    return self

Add description explaining the proposal strategy.

Args

description
What the proposal achieves
def with_extension(self, ext: dict[str, Any]) ‑> ProposalBuilder
Expand source code
def with_extension(self, ext: dict[str, Any]) -> ProposalBuilder:
    """Add extension data.

    Args:
        ext: Extension object
    """
    self._finalize_allocation()
    self._ext = ext
    return self

Add extension data.

Args

ext
Extension object
def with_pricing_option(self, pricing_option_id: str) ‑> ProposalBuilder
Expand source code
def with_pricing_option(self, pricing_option_id: str) -> ProposalBuilder:
    """Set pricing option for current allocation."""
    if self._current_allocation:
        self._current_allocation.with_pricing_option(pricing_option_id)
    return self

Set pricing option for current allocation.

def with_rationale(self, rationale: str) ‑> ProposalBuilder
Expand source code
def with_rationale(self, rationale: str) -> ProposalBuilder:
    """Add rationale for current allocation."""
    if self._current_allocation:
        self._current_allocation.with_rationale(rationale)
    return self

Add rationale for current allocation.

def with_sequence(self, sequence: int) ‑> ProposalBuilder
Expand source code
def with_sequence(self, sequence: int) -> ProposalBuilder:
    """Set sequence for current allocation."""
    if self._current_allocation:
        self._current_allocation.with_sequence(sequence)
    return self

Set sequence for current allocation.

def with_tags(self, tags: list[str]) ‑> ProposalBuilder
Expand source code
def with_tags(self, tags: list[str]) -> ProposalBuilder:
    """Add tags for current allocation."""
    if self._current_allocation:
        self._current_allocation.with_tags(tags)
    return self

Add tags for current allocation.

class ProposalNotSupported (**data: Any)
Expand source code
class ProposalNotSupported(BaseModel):
    """Response indicating proposal generation is not supported.

    Use this when your agent supports get_products but not proposal generation.
    """

    proposals_supported: bool = False
    reason: str = "This agent does not generate proposals"
    error: Error | None = None

Response indicating proposal generation is not supported.

Use this when your agent supports get_products but not proposal generation.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var errorError | None
var model_config
var proposals_supported : bool
var reason : str
class SponsoredIntelligenceHandler
Expand source code
class SponsoredIntelligenceHandler(ADCPHandler):
    """Handler for Sponsored Intelligence protocol.

    Subclass this to implement a Sponsored Intelligence agent. All SI
    operations must be implemented via the handle_* methods.
    The public methods (si_get_offering, etc.) handle validation and
    error handling automatically.

    Non-SI operations (get_products, create_media_buy, content standards, etc.)
    return 'not supported' via the base class.

    Example:
        class MySIHandler(SponsoredIntelligenceHandler):
            async def handle_si_get_offering(
                self,
                request: SiGetOfferingRequest,
                context: ToolContext | None = None
            ) -> SiGetOfferingResponse:
                # Your implementation
                return SiGetOfferingResponse(...)
    """

    _agent_type: str = "Sponsored Intelligence agents"

    # ========================================================================
    # Sponsored Intelligence Operations - Override base class with validation
    # ========================================================================

    async def si_get_offering(
        self,
        params: SiGetOfferingRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> SiGetOfferingResponse | NotImplementedResponse:
        """Get sponsored intelligence offering.

        Validates params and delegates to handle_si_get_offering.
        """
        try:
            request = SiGetOfferingRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_si_get_offering(request, context)

    async def si_initiate_session(
        self,
        params: SiInitiateSessionRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> SiInitiateSessionResponse | NotImplementedResponse:
        """Initiate sponsored intelligence session.

        Validates params and delegates to handle_si_initiate_session.
        """
        try:
            request = SiInitiateSessionRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_si_initiate_session(request, context)

    async def si_send_message(
        self,
        params: SiSendMessageRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> SiSendMessageResponse | NotImplementedResponse:
        """Send message in sponsored intelligence session.

        Validates params and delegates to handle_si_send_message.
        """
        try:
            request = SiSendMessageRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_si_send_message(request, context)

    async def si_terminate_session(
        self,
        params: SiTerminateSessionRequest | dict[str, Any],
        context: ToolContext | None = None,
    ) -> SiTerminateSessionResponse | NotImplementedResponse:
        """Terminate sponsored intelligence session.

        Validates params and delegates to handle_si_terminate_session.
        """
        try:
            request = SiTerminateSessionRequest.model_validate(params)
        except ValidationError as e:
            return NotImplementedResponse(
                supported=False,
                reason=f"Invalid request: {e}",
                error=Error(code="VALIDATION_ERROR", message=str(e)),
            )
        return await self.handle_si_terminate_session(request, context)

    # ========================================================================
    # Abstract handlers - Implement these in subclasses
    # ========================================================================

    @abstractmethod
    async def handle_si_get_offering(
        self,
        request: SiGetOfferingRequest,
        context: ToolContext | None = None,
    ) -> SiGetOfferingResponse:
        """Handle get offering request."""
        ...

    @abstractmethod
    async def handle_si_initiate_session(
        self,
        request: SiInitiateSessionRequest,
        context: ToolContext | None = None,
    ) -> SiInitiateSessionResponse:
        """Handle initiate session request."""
        ...

    @abstractmethod
    async def handle_si_send_message(
        self,
        request: SiSendMessageRequest,
        context: ToolContext | None = None,
    ) -> SiSendMessageResponse:
        """Handle send message request."""
        ...

    @abstractmethod
    async def handle_si_terminate_session(
        self,
        request: SiTerminateSessionRequest,
        context: ToolContext | None = None,
    ) -> SiTerminateSessionResponse:
        """Handle terminate session request."""
        ...

Handler for Sponsored Intelligence protocol.

Subclass this to implement a Sponsored Intelligence agent. All SI operations must be implemented via the handle_* methods. The public methods (si_get_offering, etc.) handle validation and error handling automatically.

Non-SI operations (get_products, create_media_buy, content standards, etc.) return 'not supported' via the base class.

Example

class MySIHandler(SponsoredIntelligenceHandler): async def handle_si_get_offering( self, request: SiGetOfferingRequest, context: ToolContext | None = None ) -> SiGetOfferingResponse: # Your implementation return SiGetOfferingResponse(…)

Ancestors

Methods

async def handle_si_get_offering(self,
request: SiGetOfferingRequest,
context: ToolContext | None = None) ‑> SiGetOfferingResponse
Expand source code
@abstractmethod
async def handle_si_get_offering(
    self,
    request: SiGetOfferingRequest,
    context: ToolContext | None = None,
) -> SiGetOfferingResponse:
    """Handle get offering request."""
    ...

Handle get offering request.

async def handle_si_initiate_session(self,
request: SiInitiateSessionRequest,
context: ToolContext | None = None) ‑> SiInitiateSessionResponse
Expand source code
@abstractmethod
async def handle_si_initiate_session(
    self,
    request: SiInitiateSessionRequest,
    context: ToolContext | None = None,
) -> SiInitiateSessionResponse:
    """Handle initiate session request."""
    ...

Handle initiate session request.

async def handle_si_send_message(self,
request: SiSendMessageRequest,
context: ToolContext | None = None) ‑> SiSendMessageResponse
Expand source code
@abstractmethod
async def handle_si_send_message(
    self,
    request: SiSendMessageRequest,
    context: ToolContext | None = None,
) -> SiSendMessageResponse:
    """Handle send message request."""
    ...

Handle send message request.

async def handle_si_terminate_session(self,
request: SiTerminateSessionRequest,
context: ToolContext | None = None) ‑> SiTerminateSessionResponse
Expand source code
@abstractmethod
async def handle_si_terminate_session(
    self,
    request: SiTerminateSessionRequest,
    context: ToolContext | None = None,
) -> SiTerminateSessionResponse:
    """Handle terminate session request."""
    ...

Handle terminate session request.

async def si_get_offering(self,
params: SiGetOfferingRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SiGetOfferingResponse | NotImplementedResponse
Expand source code
async def si_get_offering(
    self,
    params: SiGetOfferingRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> SiGetOfferingResponse | NotImplementedResponse:
    """Get sponsored intelligence offering.

    Validates params and delegates to handle_si_get_offering.
    """
    try:
        request = SiGetOfferingRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_si_get_offering(request, context)

Get sponsored intelligence offering.

Validates params and delegates to handle_si_get_offering.

async def si_initiate_session(self,
params: SiInitiateSessionRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SiInitiateSessionResponse | NotImplementedResponse
Expand source code
async def si_initiate_session(
    self,
    params: SiInitiateSessionRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> SiInitiateSessionResponse | NotImplementedResponse:
    """Initiate sponsored intelligence session.

    Validates params and delegates to handle_si_initiate_session.
    """
    try:
        request = SiInitiateSessionRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_si_initiate_session(request, context)

Initiate sponsored intelligence session.

Validates params and delegates to handle_si_initiate_session.

async def si_send_message(self,
params: SiSendMessageRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SiSendMessageResponse | NotImplementedResponse
Expand source code
async def si_send_message(
    self,
    params: SiSendMessageRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> SiSendMessageResponse | NotImplementedResponse:
    """Send message in sponsored intelligence session.

    Validates params and delegates to handle_si_send_message.
    """
    try:
        request = SiSendMessageRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_si_send_message(request, context)

Send message in sponsored intelligence session.

Validates params and delegates to handle_si_send_message.

async def si_terminate_session(self,
params: SiTerminateSessionRequest | dict[str, Any],
context: ToolContext | None = None) ‑> SiTerminateSessionResponse | NotImplementedResponse
Expand source code
async def si_terminate_session(
    self,
    params: SiTerminateSessionRequest | dict[str, Any],
    context: ToolContext | None = None,
) -> SiTerminateSessionResponse | NotImplementedResponse:
    """Terminate sponsored intelligence session.

    Validates params and delegates to handle_si_terminate_session.
    """
    try:
        request = SiTerminateSessionRequest.model_validate(params)
    except ValidationError as e:
        return NotImplementedResponse(
            supported=False,
            reason=f"Invalid request: {e}",
            error=Error(code="VALIDATION_ERROR", message=str(e)),
        )
    return await self.handle_si_terminate_session(request, context)

Terminate sponsored intelligence session.

Validates params and delegates to handle_si_terminate_session.

Inherited members

class TestControllerError (code: str, message: str, current_state: str | None = None)
Expand source code
class TestControllerError(Exception):
    """Typed error for test controller store methods.

    Raise this from your TestControllerStore methods to return structured
    error responses. The dispatcher catches it and converts to the AdCP
    comply_test_controller error format.

    Example:
        async def force_media_buy_status(self, media_buy_id, status, rejection_reason=None):
            prev = self.media_buys.get(media_buy_id)
            if prev is None:
                raise TestControllerError("NOT_FOUND", f"Media buy {media_buy_id} not found")
            if prev in ("completed", "rejected", "canceled"):
                raise TestControllerError(
                    "INVALID_TRANSITION",
                    f"Cannot transition from {prev}",
                    current_state=prev,
                )
            self.media_buys[media_buy_id] = status
            return {"previous_state": prev, "current_state": status}
    """

    def __init__(self, code: str, message: str, current_state: str | None = None):
        super().__init__(message)
        self.code = code
        self.current_state = current_state

Typed error for test controller store methods.

Raise this from your TestControllerStore methods to return structured error responses. The dispatcher catches it and converts to the AdCP comply_test_controller error format.

Example

async def force_media_buy_status(self, media_buy_id, status, rejection_reason=None): prev = self.media_buys.get(media_buy_id) if prev is None: raise TestControllerError("NOT_FOUND", f"Media buy {media_buy_id} not found") if prev in ("completed", "rejected", "canceled"): raise TestControllerError( "INVALID_TRANSITION", f"Cannot transition from {prev}", current_state=prev, ) self.media_buys[media_buy_id] = status return {"previous_state": prev, "current_state": status}

Ancestors

  • builtins.Exception
  • builtins.BaseException
class TestControllerStore
Expand source code
class TestControllerStore:
    """Base class for test controller state management.

    Subclass this and override the methods for scenarios your agent supports.
    Methods you don't override will be reported as unsupported scenarios
    and excluded from list_scenarios.

    Raise TestControllerError for structured error responses.
    """

    async def force_creative_status(
        self, creative_id: str, status: str, rejection_reason: str | None = None
    ) -> dict[str, Any]:
        """Force a creative to a given status.

        Returns:
            {"previous_state": str, "current_state": str}
        """
        raise NotImplementedError

    async def force_account_status(self, account_id: str, status: str) -> dict[str, Any]:
        """Force an account to a given status.

        Returns:
            {"previous_state": str, "current_state": str}
        """
        raise NotImplementedError

    async def force_media_buy_status(
        self, media_buy_id: str, status: str, rejection_reason: str | None = None
    ) -> dict[str, Any]:
        """Force a media buy to a given status.

        Returns:
            {"previous_state": str, "current_state": str}
        """
        raise NotImplementedError

    async def force_session_status(
        self, session_id: str, status: str, termination_reason: str | None = None
    ) -> dict[str, Any]:
        """Force a session to a given status.

        Returns:
            {"previous_state": str, "current_state": str}
        """
        raise NotImplementedError

    async def simulate_delivery(
        self,
        media_buy_id: str,
        impressions: int | None = None,
        clicks: int | None = None,
        conversions: int | None = None,
        reported_spend: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        """Simulate delivery metrics for a media buy.

        Returns:
            {"simulated": {...}, "cumulative": {...} | None}
        """
        raise NotImplementedError

    async def simulate_budget_spend(
        self,
        spend_percentage: float,
        account_id: str | None = None,
        media_buy_id: str | None = None,
    ) -> dict[str, Any]:
        """Simulate budget spend to a percentage.

        Returns:
            {"simulated": {...}}
        """
        raise NotImplementedError

Base class for test controller state management.

Subclass this and override the methods for scenarios your agent supports. Methods you don't override will be reported as unsupported scenarios and excluded from list_scenarios.

Raise TestControllerError for structured error responses.

Methods

async def force_account_status(self, account_id: str, status: str) ‑> dict[str, typing.Any]
Expand source code
async def force_account_status(self, account_id: str, status: str) -> dict[str, Any]:
    """Force an account to a given status.

    Returns:
        {"previous_state": str, "current_state": str}
    """
    raise NotImplementedError

Force an account to a given status.

Returns

{"previous_state": str, "current_state": str}

async def force_creative_status(self, creative_id: str, status: str, rejection_reason: str | None = None) ‑> dict[str, typing.Any]
Expand source code
async def force_creative_status(
    self, creative_id: str, status: str, rejection_reason: str | None = None
) -> dict[str, Any]:
    """Force a creative to a given status.

    Returns:
        {"previous_state": str, "current_state": str}
    """
    raise NotImplementedError

Force a creative to a given status.

Returns

{"previous_state": str, "current_state": str}

async def force_media_buy_status(self, media_buy_id: str, status: str, rejection_reason: str | None = None) ‑> dict[str, typing.Any]
Expand source code
async def force_media_buy_status(
    self, media_buy_id: str, status: str, rejection_reason: str | None = None
) -> dict[str, Any]:
    """Force a media buy to a given status.

    Returns:
        {"previous_state": str, "current_state": str}
    """
    raise NotImplementedError

Force a media buy to a given status.

Returns

{"previous_state": str, "current_state": str}

async def force_session_status(self, session_id: str, status: str, termination_reason: str | None = None) ‑> dict[str, typing.Any]
Expand source code
async def force_session_status(
    self, session_id: str, status: str, termination_reason: str | None = None
) -> dict[str, Any]:
    """Force a session to a given status.

    Returns:
        {"previous_state": str, "current_state": str}
    """
    raise NotImplementedError

Force a session to a given status.

Returns

{"previous_state": str, "current_state": str}

async def simulate_budget_spend(self,
spend_percentage: float,
account_id: str | None = None,
media_buy_id: str | None = None) ‑> dict[str, typing.Any]
Expand source code
async def simulate_budget_spend(
    self,
    spend_percentage: float,
    account_id: str | None = None,
    media_buy_id: str | None = None,
) -> dict[str, Any]:
    """Simulate budget spend to a percentage.

    Returns:
        {"simulated": {...}}
    """
    raise NotImplementedError

Simulate budget spend to a percentage.

Returns

{"simulated": {…}}

async def simulate_delivery(self,
media_buy_id: str,
impressions: int | None = None,
clicks: int | None = None,
conversions: int | None = None,
reported_spend: dict[str, Any] | None = None) ‑> dict[str, typing.Any]
Expand source code
async def simulate_delivery(
    self,
    media_buy_id: str,
    impressions: int | None = None,
    clicks: int | None = None,
    conversions: int | None = None,
    reported_spend: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Simulate delivery metrics for a media buy.

    Returns:
        {"simulated": {...}, "cumulative": {...} | None}
    """
    raise NotImplementedError

Simulate delivery metrics for a media buy.

Returns

{"simulated": {…}, "cumulative": {…} | None}

class TmpHandler
Expand source code
class TmpHandler(ADCPHandler):
    """Handler for Temporal Matching Protocol operations.

    Subclass this to implement context matching and identity matching.
    Only TMP tools will be exposed via MCP.

    Example:
        class MyTmpAgent(TmpHandler):
            async def context_match(self, params, context=None):
                # Evaluate context signals against buyer packages
                pass

            async def identity_match(self, params, context=None):
                # Evaluate user identity for package eligibility
                pass
    """

    _agent_type = "TMP agents"

Handler for Temporal Matching Protocol operations.

Subclass this to implement context matching and identity matching. Only TMP tools will be exposed via MCP.

Example

class MyTmpAgent(TmpHandler): async def context_match(self, params, context=None): # Evaluate context signals against buyer packages pass

async def identity_match(self, params, context=None):
    # Evaluate user identity for package eligibility
    pass

Ancestors

Inherited members

class ToolContext (request_id: str | None = None,
caller_identity: str | None = None,
metadata: dict[str, Any] = <factory>)
Expand source code
@dataclass
class ToolContext:
    """Context passed to tool handlers.

    Contains metadata about the current request that may be useful
    for logging, authorization, or other cross-cutting concerns.

    :param caller_identity: The authenticated principal making the request.
        **MUST** be a stable, globally-unique identifier within the seller's
        tenant — never an email, display name, or any other mutable handle.
        The server-side idempotency middleware keys its cache by
        ``(caller_identity, idempotency_key)`` — reuse of the same string for
        two distinct principals (e.g. email reuse after account deletion)
        causes cross-principal replay (confidentiality leak). Populated by
        the transport layer (A2A: ``ServerCallContext.user.user_name``; MCP:
        seller's FastMCP auth middleware).
    """

    request_id: str | None = None
    caller_identity: str | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

Context passed to tool handlers.

Contains metadata about the current request that may be useful for logging, authorization, or other cross-cutting concerns.

:param caller_identity: The authenticated principal making the request. MUST be a stable, globally-unique identifier within the seller's tenant — never an email, display name, or any other mutable handle. The server-side idempotency middleware keys its cache by (caller_identity, idempotency_key) — reuse of the same string for two distinct principals (e.g. email reuse after account deletion) causes cross-principal replay (confidentiality leak). Populated by the transport layer (A2A: ServerCallContext.user.user_name; MCP: seller's FastMCP auth middleware).

Instance variables

var caller_identity : str | None
var metadata : dict[str, typing.Any]
var request_id : str | None