Module adcp.webhooks

Webhook creation, signing, and reception for AdCP agents.

Single front door for both senders and receivers. Underlying modules in adcp.signing.webhook_* and adcp.webhook_receiver are implementation details kept for internal organization — prefer the re-exports here for stability.

Which sender helper to use

Functions

def create_a2a_webhook_payload(task_id: str,
status: GeneratedTaskStatus,
context_id: str,
result: AdcpAsyncResponseData | dict[str, Any],
timestamp: datetime | None = None) ‑> a2a.types.Task | a2a.types.TaskStatusUpdateEvent
Expand source code
def create_a2a_webhook_payload(
    task_id: str,
    status: GeneratedTaskStatus,
    context_id: str,
    result: AdcpAsyncResponseData | dict[str, Any],
    timestamp: datetime | None = None,
) -> Task | TaskStatusUpdateEvent:
    """
    Create A2A webhook payload (Task or TaskStatusUpdateEvent).

    Per A2A specification:
    - Terminated statuses (completed, failed): Returns Task with artifacts[].parts[]
    - Intermediate statuses (working, input-required, submitted): Returns TaskStatusUpdateEvent
      with status.message.parts[]

    This function helps agent implementations construct properly formatted A2A webhook
    payloads for sending to clients.

    Args:
        task_id: Unique identifier for the task
        status: Current task status
        context_id: Session/conversation identifier (required by A2A protocol)
        timestamp: When the webhook was generated (defaults to current UTC time)
        result: Task-specific payload (AdCP response data)

    Returns:
        Task object for terminated statuses, TaskStatusUpdateEvent for intermediate statuses

    Examples:
        Create a completed Task webhook:
        >>> from adcp.webhooks import create_a2a_webhook_payload
        >>> from adcp.types import GeneratedTaskStatus
        >>>
        >>> task = create_a2a_webhook_payload(
        ...     task_id="task_123",
        ...     status=GeneratedTaskStatus.completed,
        ...     result={"products": [...]},
        ...     message="Found 5 products"
        ... )
        >>> # task is a Task object with artifacts containing the result

        Create a working status update:
        >>> event = create_a2a_webhook_payload(
        ...     task_id="task_456",
        ...     status=GeneratedTaskStatus.working,
        ...     message="Processing 3 of 10 items"
        ... )
        >>> # event is a TaskStatusUpdateEvent with status.message

        Send A2A webhook via HTTP POST:
        >>> import httpx
        >>> from a2a.types import Task
        >>>
        >>> payload = create_a2a_webhook_payload(...)
        >>> # Serialize to dict for JSON
        >>> if isinstance(payload, Task):
        ...     payload_dict = payload.model_dump(mode='json')
        ... else:
        ...     payload_dict = payload.model_dump(mode='json')
        >>>
        >>> response = await httpx.post(webhook_url, json=payload_dict)
    """
    if timestamp is None:
        timestamp = datetime.now(timezone.utc)

    # Convert datetime to ISO string for A2A protocol
    timestamp_str = timestamp.isoformat() if isinstance(timestamp, datetime) else timestamp

    # Map GeneratedTaskStatus to A2A status state string
    status_value = status.value if hasattr(status, "value") else str(status)

    # Map AdCP status to A2A status state
    # Note: A2A uses "input-required" (hyphenated) while AdCP uses "input_required" (underscore)
    status_mapping = {
        "completed": "completed",
        "failed": "failed",
        "working": "working",
        "submitted": "submitted",
        "input_required": "input-required",
    }
    a2a_status_state = status_mapping.get(status_value, status_value)

    # Build parts for the message/artifact
    parts: list[Part] = []

    # Add DataPart
    # Convert AdcpAsyncResponseData to dict if it's a Pydantic model
    if hasattr(result, "model_dump"):
        result_dict: dict[str, Any] = result.model_dump(mode="json")
    else:
        result_dict = result

    data_part = DataPart(data=result_dict)
    parts.append(Part(root=data_part))

    # Determine if this is a terminated status (Task) or intermediate (TaskStatusUpdateEvent)
    is_terminated = status in [GeneratedTaskStatus.completed, GeneratedTaskStatus.failed]

    # Convert string to TaskState enum
    task_state_enum = TaskState(a2a_status_state)

    if is_terminated:
        # Create Task object with artifacts for terminated statuses
        task_status = TaskStatus(state=task_state_enum, timestamp=timestamp_str)

        # Build artifact with parts
        # Note: Artifact requires artifact_id, use task_id as prefix
        if parts:
            artifact = Artifact(
                artifact_id=f"{task_id}_result",
                parts=parts,
            )
            artifacts = [artifact]
        else:
            artifacts = []

        return Task(
            id=task_id,
            status=task_status,
            artifacts=artifacts,
            context_id=context_id,
        )
    else:
        # Create TaskStatusUpdateEvent with status.message for intermediate statuses
        # Build message with parts
        if parts:
            message_obj = Message(
                message_id=f"{task_id}_msg",
                role=Role.agent,  # Agent is responding
                parts=parts,
            )
        else:
            message_obj = None

        task_status = TaskStatus(
            state=task_state_enum, timestamp=timestamp_str, message=message_obj
        )

        return TaskStatusUpdateEvent(
            task_id=task_id,
            status=task_status,
            context_id=context_id,
            final=False,  # Intermediate statuses are not final
        )

Create A2A webhook payload (Task or TaskStatusUpdateEvent).

Per A2A specification: - Terminated statuses (completed, failed): Returns Task with artifacts[].parts[] - Intermediate statuses (working, input-required, submitted): Returns TaskStatusUpdateEvent with status.message.parts[]

This function helps agent implementations construct properly formatted A2A webhook payloads for sending to clients.

Args

task_id
Unique identifier for the task
status
Current task status
context_id
Session/conversation identifier (required by A2A protocol)
timestamp
When the webhook was generated (defaults to current UTC time)
result
Task-specific payload (AdCP response data)

Returns

Task object for terminated statuses, TaskStatusUpdateEvent for intermediate statuses

Examples

Create a completed Task webhook:

>>> from adcp.webhooks import create_a2a_webhook_payload
>>> from adcp.types import GeneratedTaskStatus
>>>
>>> task = create_a2a_webhook_payload(
...     task_id="task_123",
...     status=GeneratedTaskStatus.completed,
...     result={"products": [...]},
...     message="Found 5 products"
... )
>>> # task is a Task object with artifacts containing the result

Create a working status update:

>>> event = create_a2a_webhook_payload(
...     task_id="task_456",
...     status=GeneratedTaskStatus.working,
...     message="Processing 3 of 10 items"
... )
>>> # event is a TaskStatusUpdateEvent with status.message

Send A2A webhook via HTTP POST:

>>> import httpx
>>> from a2a.types import Task
>>>
>>> payload = create_a2a_webhook_payload(...)
>>> # Serialize to dict for JSON
>>> if isinstance(payload, Task):
...     payload_dict = payload.model_dump(mode='json')
... else:
...     payload_dict = payload.model_dump(mode='json')
>>>
>>> response = await httpx.post(webhook_url, json=payload_dict)
def create_mcp_webhook_payload(task_id: str,
status: GeneratedTaskStatus | str,
result: AdcpAsyncResponseData | dict[str, Any] | None = None,
timestamp: datetime | None = None,
task_type: str | None = None,
operation_id: str | None = None,
message: str | None = None,
context_id: str | None = None,
domain: str | None = None,
idempotency_key: str | None = None) ‑> dict[str, typing.Any]
Expand source code
def create_mcp_webhook_payload(
    task_id: str,
    status: GeneratedTaskStatus | str,
    result: AdcpAsyncResponseData | dict[str, Any] | None = None,
    timestamp: datetime | None = None,
    task_type: str | None = None,
    operation_id: str | None = None,
    message: str | None = None,
    context_id: str | None = None,
    domain: str | None = None,
    idempotency_key: str | None = None,
) -> dict[str, Any]:
    """
    Create MCP webhook payload dictionary.

    This function helps agent implementations construct properly formatted
    webhook payloads for sending to clients.

    Args:
        task_id: Unique identifier for the task
        status: Current task status
        task_type: Optionally type of AdCP operation (e.g., "get_products", "create_media_buy")
        timestamp: When the webhook was generated (defaults to current UTC time)
        result: Task-specific payload (AdCP response data)
        operation_id: Publisher-defined operation identifier (deprecated from payload,
            should be in URL routing, but included for backward compatibility)
        message: Human-readable summary of task state
        context_id: Session/conversation identifier
        domain: AdCP domain this task belongs to
        idempotency_key: Sender-generated key stable across retries of the same
            event. Defaults to a freshly-generated UUID v4 — callers retrying
            delivery of the same event MUST pass the key from their first
            attempt; passing None twice mints two keys and defeats dedup.

    Returns:
        Dictionary matching McpWebhookPayload schema, ready to be sent as JSON

    Examples:
        Create a completed webhook with results:
        >>> from adcp.webhooks import create_mcp_webhook_payload
        >>> from adcp.types import GeneratedTaskStatus
        >>>
        >>> payload = create_mcp_webhook_payload(
        ...     task_id="task_123",
        ...     task_type="get_products",
        ...     status=GeneratedTaskStatus.completed,
        ...     result={"products": [...]},
        ...     message="Found 5 products"
        ... )

        Create a failed webhook with error:
        >>> payload = create_mcp_webhook_payload(
        ...     task_id="task_456",
        ...     task_type="create_media_buy",
        ...     status=GeneratedTaskStatus.failed,
        ...     result={"errors": [{"code": "INVALID_INPUT", "message": "..."}]},
        ...     message="Validation failed"
        ... )

        Create a working status update:
        >>> payload = create_mcp_webhook_payload(
        ...     task_id="task_789",
        ...     task_type="sync_creatives",
        ...     status=GeneratedTaskStatus.working,
        ...     message="Processing 3 of 10 creatives"
        ... )
    """
    if timestamp is None:
        timestamp = datetime.now(timezone.utc)
    if idempotency_key is None:
        idempotency_key = generate_webhook_idempotency_key()

    # Convert status enum to string value
    status_value = status.value if hasattr(status, "value") else str(status)

    # Build payload matching McpWebhookPayload schema
    payload: dict[str, Any] = {
        "idempotency_key": idempotency_key,
        "task_id": task_id,
        "task_type": task_type,
        "status": status_value,
        "timestamp": timestamp.isoformat() if isinstance(timestamp, datetime) else timestamp,
    }

    # Add optional fields only if provided
    if result is not None:
        # Convert Pydantic model to dict if needed for JSON serialization
        if hasattr(result, "model_dump"):
            payload["result"] = result.model_dump(mode="json")
        else:
            payload["result"] = result

    if operation_id is not None:
        payload["operation_id"] = operation_id

    if message is not None:
        payload["message"] = message

    if context_id is not None:
        payload["context_id"] = context_id

    if domain is not None:
        payload["domain"] = domain

    return payload

Create MCP webhook payload dictionary.

This function helps agent implementations construct properly formatted webhook payloads for sending to clients.

Args

task_id
Unique identifier for the task
status
Current task status
task_type
Optionally type of AdCP operation (e.g., "get_products", "create_media_buy")
timestamp
When the webhook was generated (defaults to current UTC time)
result
Task-specific payload (AdCP response data)
operation_id
Publisher-defined operation identifier (deprecated from payload, should be in URL routing, but included for backward compatibility)
message
Human-readable summary of task state
context_id
Session/conversation identifier
domain
AdCP domain this task belongs to
idempotency_key
Sender-generated key stable across retries of the same event. Defaults to a freshly-generated UUID v4 — callers retrying delivery of the same event MUST pass the key from their first attempt; passing None twice mints two keys and defeats dedup.

Returns

Dictionary matching McpWebhookPayload schema, ready to be sent as JSON

Examples

Create a completed webhook with results:

>>> from adcp.webhooks import create_mcp_webhook_payload
>>> from adcp.types import GeneratedTaskStatus
>>>
>>> payload = create_mcp_webhook_payload(
...     task_id="task_123",
...     task_type="get_products",
...     status=GeneratedTaskStatus.completed,
...     result={"products": [...]},
...     message="Found 5 products"
... )

Create a failed webhook with error:

>>> payload = create_mcp_webhook_payload(
...     task_id="task_456",
...     task_type="create_media_buy",
...     status=GeneratedTaskStatus.failed,
...     result={"errors": [{"code": "INVALID_INPUT", "message": "..."}]},
...     message="Validation failed"
... )

Create a working status update:

>>> payload = create_mcp_webhook_payload(
...     task_id="task_789",
...     task_type="sync_creatives",
...     status=GeneratedTaskStatus.working,
...     message="Processing 3 of 10 creatives"
... )
async def deliver(config: AdCPBaseModel | Mapping[str, Any],
payload: AdCPBaseModel | Task | TaskStatusUpdateEvent | Mapping[str, Any],
*,
client: httpx.AsyncClient | None = None,
extra_headers: Mapping[str, str] | None = None,
timeout_seconds: float | None = None,
token_field: str | None = None) ‑> httpx.Response
Expand source code
async def deliver(
    config: AdCPBaseModel | Mapping[str, Any],
    payload: AdCPBaseModel | Task | TaskStatusUpdateEvent | Mapping[str, Any],
    *,
    client: httpx.AsyncClient | None = None,
    extra_headers: Mapping[str, str] | None = None,
    timeout_seconds: float | None = None,
    token_field: str | None = None,
) -> httpx.Response:
    """Dispatch one legacy-auth webhook in a single call.

    Collapses the sender's six-step boilerplate (build envelope, serialize,
    sign, merge headers, POST, echo token) into one call so the signer and
    the wire see the *same bytes*. The serialization-format drift that
    plagued the hand-rolled path — ``json=`` in httpx re-serializes the dict
    and breaks ``Content-Digest`` — is structurally impossible here: the
    helper JSON-serializes once, signs those bytes, and POSTs those bytes
    via ``content=``.

    This helper is for the **legacy** AdCP 3.x authentication schemes
    (``Bearer`` / ``HMAC-SHA256``) and emits a :class:`DeprecationWarning`
    on first use. For 4.0+ integrations use :class:`WebhookSender` (RFC 9421).

    Args:
        config: A :class:`PushNotificationConfig`, :class:`ReportingWebhook`,
            or equivalent dict. Must carry ``url`` (``https://`` only) and
            ``authentication.{schemes, credentials}``.
        payload: The webhook body. Accepts a Pydantic model (e.g. built via
            :func:`create_mcp_webhook_payload` / :func:`create_a2a_webhook_payload`),
            an a2a ``Task`` / ``TaskStatusUpdateEvent``, or a plain dict.
            Models are dumped with ``mode="json", exclude_none=True``.
        client: Optional shared ``httpx.AsyncClient``. Recommended in
            production for connection pooling and egress-policy enforcement
            (a custom ``httpx.BaseTransport`` is the right place to block
            SSRF to private IPs — the helper validates scheme but cannot
            see post-DNS resolution without racing TOCTOU).
        extra_headers: Merged last. May not override any of
            ``Content-Type``, ``Content-Digest``, ``Content-Length``,
            ``Host``, ``Authorization``, ``Signature``, ``Signature-Input``,
            ``X-AdCP-Signature``, or ``X-AdCP-Timestamp``. Auth and
            signature-binding headers are sender-owned so the signer and
            the wire cannot disagree.
        timeout_seconds: Per-request timeout applied only when the helper
            creates its own client. Raises ``ValueError`` if set alongside
            ``client=`` — configure the timeout on the shared client instead.
        token_field: Opt-in field name for echoing ``config.token`` into
            the payload body (top-level for MCP dicts, under ``metadata``
            for ``Task`` / ``TaskStatusUpdateEvent``). Default ``None``
            disables echo; there is no spec-defined field name, so the
            caller must pick one the receiver agrees to read.

    Returns:
        The raw ``httpx.Response``. Caller is responsible for
        ``response.status_code`` inspection and retry scheduling. For retry,
        pass the *same, unmutated* payload again — serialization is
        deterministic so retries produce byte-identical bodies (spec-correct
        receiver dedup via ``idempotency_key``). Mutating the payload dict
        between attempts breaks byte-identity; callers who need byte-identical
        HTTP envelopes across retries (including headers) should use
        :class:`WebhookSender` and :meth:`WebhookSender.resend`. There is
        intentionally no ``resend()`` here — the retry contract is "call
        ``deliver`` again with the same inputs".

    Raises:
        ValueError: missing ``url``, non-HTTPS URL, control characters in
            header values, missing / unknown ``authentication`` (use
            :class:`WebhookSender` for RFC 9421), overriding a reserved
            header, or setting ``timeout_seconds`` alongside ``client``.
        DeprecationWarning (fires once): ``authentication`` is a 3.x fallback.

    Security notes:
        * ``config.url`` is buyer-controlled. The helper enforces HTTPS and
          rejects control characters but does NOT block private / link-local
          destinations — wire an egress policy via ``client.transport`` to
          stop SSRF into your VPC or cloud metadata service.
        * ``config.token`` sits in the request body, so any receiver that
          logs bodies retains it indefinitely. Treat the token as a
          medium-sensitivity correlator, not a long-lived secret.
        * At ``httpx`` DEBUG log level, ``Authorization`` and
          ``X-AdCP-Signature`` appear in logs — gate DEBUG in production.
    """
    if client is not None and timeout_seconds is not None:
        raise ValueError(
            "timeout_seconds cannot be set when client= is provided; "
            "configure the timeout on your shared httpx.AsyncClient instead."
        )

    url, token, auth_scheme, credentials = _extract_config_fields(config)

    if auth_scheme is None:
        raise ValueError(
            "config.authentication is required for deliver(). "
            "For RFC 9421 signing (the AdCP 4.0 default), use "
            "adcp.webhooks.WebhookSender — no helper for unsigned webhooks "
            "is provided because the spec requires signing."
        )
    if auth_scheme not in ("Bearer", "HMAC-SHA256"):
        raise ValueError(
            f"unknown authentication scheme {auth_scheme!r}; "
            "supported legacy schemes are 'Bearer' and 'HMAC-SHA256'. "
            "For RFC 9421 use adcp.webhooks.WebhookSender."
        )

    _warn_auth_deprecation_once()

    body_dict = _payload_to_dict(payload)
    if token is not None and token_field is not None:
        _validate_header_value("config.token", token)
        _inject_push_token(body_dict, token, payload, token_field)

    # Compact separators so the signer and the wire see byte-identical
    # payloads, matching the canonical on-wire form pinned by
    # adcontextprotocol/adcp#2478. ``_compute_legacy_signature`` returns the
    # same compact body bytes below — we serialize here for the size check
    # and Bearer path, which both operate on the final transmitted bytes.
    body_bytes = json.dumps(body_dict, separators=(",", ":")).encode("utf-8")
    if len(body_bytes) > _MAX_BODY_BYTES:
        raise ValueError(
            f"serialized webhook body is {len(body_bytes):,} bytes, over the "
            f"{_MAX_BODY_BYTES:,}-byte cap. Split into smaller webhooks or use "
            "the batch-reporting endpoints — most receivers reject bodies over "
            "10MB at the reverse proxy anyway."
        )

    headers: dict[str, str] = {"Content-Type": "application/json"}

    if auth_scheme == "Bearer":
        if not credentials:
            raise ValueError(
                "config.authentication.schemes=['Bearer'] requires "
                "authentication.credentials (min 32 characters — token "
                "exchanged out-of-band with the receiver)."
            )
        _validate_header_value("authentication.credentials", credentials)
        headers["Authorization"] = f"Bearer {credentials}"
    else:  # HMAC-SHA256
        if not credentials:
            raise ValueError(
                "config.authentication.schemes=['HMAC-SHA256'] requires "
                "authentication.credentials (min 32 characters — shared "
                "secret exchanged out-of-band with the receiver)."
            )
        _validate_header_value("authentication.credentials", credentials)
        get_adcp_signed_headers_for_webhook(
            headers,
            secret=credentials,
            timestamp=str(int(time.time())),
            payload=body_dict,
        )

    if extra_headers:
        if len(extra_headers) > _MAX_EXTRA_HEADERS:
            raise ValueError(
                f"extra_headers has {len(extra_headers)} entries; "
                f"helper caps at {_MAX_EXTRA_HEADERS}. Pass only the custom "
                "headers you actually need (trace IDs, correlation IDs)."
            )
        for key in extra_headers:
            normalized = str(key).lower()
            if normalized in _RESERVED_HEADERS or normalized.startswith(":"):
                raise ValueError(_reserved_header_message(normalized, key))
        for key, value in extra_headers.items():
            _validate_header_value(f"extra_headers[{key!r}]", value)
            headers[key] = value

    owns_client = client is None
    effective_timeout = timeout_seconds if timeout_seconds is not None else _DEFAULT_TIMEOUT_SECONDS
    http_client = client or httpx.AsyncClient(timeout=effective_timeout)
    try:
        return await http_client.post(url, content=body_bytes, headers=headers)
    finally:
        if owns_client:
            await http_client.aclose()

Dispatch one legacy-auth webhook in a single call.

Collapses the sender's six-step boilerplate (build envelope, serialize, sign, merge headers, POST, echo token) into one call so the signer and the wire see the same bytes. The serialization-format drift that plagued the hand-rolled path — json= in httpx re-serializes the dict and breaks Content-Digest — is structurally impossible here: the helper JSON-serializes once, signs those bytes, and POSTs those bytes via content=.

This helper is for the legacy AdCP 3.x authentication schemes (Bearer / HMAC-SHA256) and emits a :class:DeprecationWarning on first use. For 4.0+ integrations use :class:WebhookSender (RFC 9421).

Args

config
A :class:PushNotificationConfig, :class:ReportingWebhook, or equivalent dict. Must carry url (https:// only) and authentication.{schemes, credentials}.
payload
The webhook body. Accepts a Pydantic model (e.g. built via :func:create_mcp_webhook_payload() / :func:create_a2a_webhook_payload()), an a2a Task / TaskStatusUpdateEvent, or a plain dict. Models are dumped with mode="json", exclude_none=True.
client
Optional shared httpx.AsyncClient. Recommended in production for connection pooling and egress-policy enforcement (a custom httpx.BaseTransport is the right place to block SSRF to private IPs — the helper validates scheme but cannot see post-DNS resolution without racing TOCTOU).
extra_headers
Merged last. May not override any of Content-Type, Content-Digest, Content-Length, Host, Authorization, Signature, Signature-Input, X-AdCP-Signature, or X-AdCP-Timestamp. Auth and signature-binding headers are sender-owned so the signer and the wire cannot disagree.
timeout_seconds
Per-request timeout applied only when the helper creates its own client. Raises ValueError if set alongside client= — configure the timeout on the shared client instead.
token_field
Opt-in field name for echoing config.token into the payload body (top-level for MCP dicts, under metadata for Task / TaskStatusUpdateEvent). Default None disables echo; there is no spec-defined field name, so the caller must pick one the receiver agrees to read.

Returns

The raw httpx.Response. Caller is responsible for response.status_code inspection and retry scheduling. For retry, pass the same, unmutated payload again — serialization is deterministic so retries produce byte-identical bodies (spec-correct receiver dedup via idempotency_key). Mutating the payload dict between attempts breaks byte-identity; callers who need byte-identical HTTP envelopes across retries (including headers) should use :class:WebhookSender and :meth:WebhookSender.resend(). There is intentionally no resend() here — the retry contract is "call deliver() again with the same inputs".

Raises

ValueError
missing url, non-HTTPS URL, control characters in header values, missing / unknown authentication (use :class:WebhookSender for RFC 9421), overriding a reserved header, or setting timeout_seconds alongside client.

DeprecationWarning (fires once): authentication is a 3.x fallback. Security notes: * config.url is buyer-controlled. The helper enforces HTTPS and rejects control characters but does NOT block private / link-local destinations — wire an egress policy via client.transport to stop SSRF into your VPC or cloud metadata service. * config.token sits in the request body, so any receiver that logs bodies retains it indefinitely. Treat the token as a medium-sensitivity correlator, not a long-lived secret. * At httpx DEBUG log level, Authorization and X-AdCP-Signature appear in logs — gate DEBUG in production.

def extract_webhook_result_data(webhook_payload: dict[str, Any]) ‑> AdcpAsyncResponseData | None
Expand source code
def extract_webhook_result_data(webhook_payload: dict[str, Any]) -> AdcpAsyncResponseData | None:
    """
    Extract result data from webhook payload (MCP or A2A format).

    This utility function handles webhook payloads from both MCP and A2A protocols,
    extracting the result data regardless of the webhook format. Useful for quick
    inspection, logging, or custom webhook routing logic without requiring full
    client initialization.

    Protocol Detection:
    - A2A Task: Has "artifacts" field (terminated statuses: completed, failed)
    - A2A TaskStatusUpdateEvent: Has nested "status.message" structure (intermediate statuses)
    - MCP: Has "result" field directly

    Args:
        webhook_payload: Raw webhook dictionary from HTTP request (JSON-deserialized)

    Returns:
        AdcpAsyncResponseData union type containing the extracted AdCP response, or None
        if no result present. For A2A webhooks, unwraps data from artifacts/message parts
        structure. For MCP webhooks, returns the result field directly.

    Examples:
        Extract from MCP webhook:
        >>> mcp_payload = {
        ...     "task_id": "task_123",
        ...     "task_type": "create_media_buy",
        ...     "status": "completed",
        ...     "timestamp": "2025-01-15T10:00:00Z",
        ...     "result": {"media_buy_id": "mb_123", "buyer_ref": "ref_123", "packages": []}
        ... }
        >>> result = extract_webhook_result_data(mcp_payload)
        >>> print(result["media_buy_id"])
        mb_123

        Extract from A2A Task webhook:
        >>> a2a_task_payload = {
        ...     "id": "task_456",
        ...     "context_id": "ctx_456",
        ...     "status": {"state": "completed", "timestamp": "2025-01-15T10:00:00Z"},
        ...     "artifacts": [
        ...         {
        ...             "artifact_id": "artifact_456",
        ...             "parts": [
        ...                 {
        ...                     "data": {
        ...                         "media_buy_id": "mb_456",
        ...                         "buyer_ref": "ref_456",
        ...                         "packages": []
        ...                     }
        ...                 }
        ...             ]
        ...         }
        ...     ]
        ... }
        >>> result = extract_webhook_result_data(a2a_task_payload)
        >>> print(result["media_buy_id"])
        mb_456

        Extract from A2A TaskStatusUpdateEvent webhook:
        >>> a2a_event_payload = {
        ...     "task_id": "task_789",
        ...     "context_id": "ctx_789",
        ...     "status": {
        ...         "state": "working",
        ...         "timestamp": "2025-01-15T10:00:00Z",
        ...         "message": {
        ...             "message_id": "msg_789",
        ...             "role": "agent",
        ...             "parts": [
        ...                 {"data": {"current_step": "processing", "percentage": 50}}
        ...             ]
        ...         }
        ...     },
        ...     "final": False
        ... }
        >>> result = extract_webhook_result_data(a2a_event_payload)
        >>> print(result["percentage"])
        50

        Handle webhook with no result:
        >>> empty_payload = {"task_id": "task_000", "status": "working", "timestamp": "..."}
        >>> result = extract_webhook_result_data(empty_payload)
        >>> print(result)
        None
    """
    # Detect A2A Task format (has "artifacts" field)
    if "artifacts" in webhook_payload:
        # Extract from task.artifacts[].parts[]
        artifacts = webhook_payload.get("artifacts", [])
        if not artifacts:
            return None

        # Use last artifact (most recent)
        target_artifact = artifacts[-1]
        parts = target_artifact.get("parts", [])
        if not parts:
            return None

        # Find DataPart (skip TextPart)
        for part in parts:
            # Check if this part has "data" field (DataPart)
            if "data" in part:
                data = part["data"]
                # Unwrap {"response": {...}} wrapper if present (A2A convention)
                if isinstance(data, dict) and "response" in data and len(data) == 1:
                    return cast(AdcpAsyncResponseData, data["response"])
                return cast(AdcpAsyncResponseData, data)

        return None

    # Detect A2A TaskStatusUpdateEvent format (has nested "status.message")
    status = webhook_payload.get("status")
    if isinstance(status, dict):
        message = status.get("message")
        if isinstance(message, dict):
            # Extract from status.message.parts[]
            parts = message.get("parts", [])
            if not parts:
                return None

            # Find DataPart
            for part in parts:
                if "data" in part:
                    data = part["data"]
                    # Unwrap {"response": {...}} wrapper if present
                    if isinstance(data, dict) and "response" in data and len(data) == 1:
                        return cast(AdcpAsyncResponseData, data["response"])
                    return cast(AdcpAsyncResponseData, data)

            return None

    # MCP format: result field directly
    return cast(AdcpAsyncResponseData | None, webhook_payload.get("result"))

Extract result data from webhook payload (MCP or A2A format).

This utility function handles webhook payloads from both MCP and A2A protocols, extracting the result data regardless of the webhook format. Useful for quick inspection, logging, or custom webhook routing logic without requiring full client initialization.

Protocol Detection: - A2A Task: Has "artifacts" field (terminated statuses: completed, failed) - A2A TaskStatusUpdateEvent: Has nested "status.message" structure (intermediate statuses) - MCP: Has "result" field directly

Args

webhook_payload
Raw webhook dictionary from HTTP request (JSON-deserialized)

Returns

AdcpAsyncResponseData union type containing the extracted AdCP response, or None if no result present. For A2A webhooks, unwraps data from artifacts/message parts structure. For MCP webhooks, returns the result field directly.

Examples

Extract from MCP webhook:

>>> mcp_payload = {
...     "task_id": "task_123",
...     "task_type": "create_media_buy",
...     "status": "completed",
...     "timestamp": "2025-01-15T10:00:00Z",
...     "result": {"media_buy_id": "mb_123", "buyer_ref": "ref_123", "packages": []}
... }
>>> result = extract_webhook_result_data(mcp_payload)
>>> print(result["media_buy_id"])
mb_123

Extract from A2A Task webhook:

>>> a2a_task_payload = {
...     "id": "task_456",
...     "context_id": "ctx_456",
...     "status": {"state": "completed", "timestamp": "2025-01-15T10:00:00Z"},
...     "artifacts": [
...         {
...             "artifact_id": "artifact_456",
...             "parts": [
...                 {
...                     "data": {
...                         "media_buy_id": "mb_456",
...                         "buyer_ref": "ref_456",
...                         "packages": []
...                     }
...                 }
...             ]
...         }
...     ]
... }
>>> result = extract_webhook_result_data(a2a_task_payload)
>>> print(result["media_buy_id"])
mb_456

Extract from A2A TaskStatusUpdateEvent webhook:

>>> a2a_event_payload = {
...     "task_id": "task_789",
...     "context_id": "ctx_789",
...     "status": {
...         "state": "working",
...         "timestamp": "2025-01-15T10:00:00Z",
...         "message": {
...             "message_id": "msg_789",
...             "role": "agent",
...             "parts": [
...                 {"data": {"current_step": "processing", "percentage": 50}}
...             ]
...         }
...     },
...     "final": False
... }
>>> result = extract_webhook_result_data(a2a_event_payload)
>>> print(result["percentage"])
50

Handle webhook with no result:

>>> empty_payload = {"task_id": "task_000", "status": "working", "timestamp": "..."}
>>> result = extract_webhook_result_data(empty_payload)
>>> print(result)
None
def generate_webhook_idempotency_key() ‑> str
Expand source code
def generate_webhook_idempotency_key() -> str:
    """Generate a cryptographically random idempotency_key for a webhook event.

    Returns a UUID v4 prefixed with ``whk_`` — matches the example format in
    ``webhooks.mdx`` and stays within the spec's length + charset bounds
    (``^[A-Za-z0-9_.:-]{16,255}$``).

    Publishers MUST generate this once per distinct event and reuse the same
    value when retrying delivery. Do NOT call this function again on retry —
    it would mint a fresh UUID and defeat the dedup contract.
    """
    return f"whk_{uuid.uuid4()}"

Generate a cryptographically random idempotency_key for a webhook event.

Returns a UUID v4 prefixed with whk_ — matches the example format in webhooks.mdx and stays within the spec's length + charset bounds (^[A-Za-z0-9_.:-]{16,255}$).

Publishers MUST generate this once per distinct event and reuse the same value when retrying delivery. Do NOT call this function again on retry — it would mint a fresh UUID and defeat the dedup contract.

def get_adcp_signed_headers_for_webhook(headers: dict[str, Any],
secret: str,
timestamp: str | int | None,
payload: dict[str, Any] | AdCPBaseModel) ‑> dict[str, typing.Any]
Expand source code
def get_adcp_signed_headers_for_webhook(
    headers: dict[str, Any],
    secret: str,
    timestamp: str | int | None,
    payload: dict[str, Any] | AdCPBaseModel,
) -> dict[str, Any]:
    """
    Generate AdCP-compliant signed headers for webhook delivery.

    This function creates a cryptographic signature that proves the webhook
    came from an authorized agent and protects against replay attacks by
    including a timestamp in the signed message.

    The function adds two headers to the provided headers dict:
    - X-AdCP-Signature: HMAC-SHA256 signature in format "sha256=<hex_digest>"
    - X-AdCP-Timestamp: Unix timestamp in seconds

    The signing algorithm:
    1. Constructs message as "{timestamp}.{json_payload}"
    2. JSON-serializes payload with default separators (matches wire format from json= kwarg)
    3. UTF-8 encodes the message
    4. HMAC-SHA256 signs with the shared secret
    5. Hex-encodes and prefixes with "sha256="

    Args:
        headers: Existing headers dictionary to add signature headers to
        secret: Shared secret key for HMAC signing
        timestamp: Unix timestamp in seconds (str or int). If None, uses current time.
        payload: Webhook payload (dict or Pydantic model - will be JSON-serialized)

    Returns:
        The modified headers dictionary with signature headers added

    Examples:
        Sign and send an MCP webhook:
        >>> import time
        >>> from adcp.webhooks import create_mcp_webhook_payload
        >>> from adcp.webhooks import get_adcp_signed_headers_for_webhook
        >>>
        >>> payload = create_mcp_webhook_payload(
        ...     task_id="task_123",
        ...     task_type="get_products",
        ...     status="completed",
        ...     result={"products": [...]}
        ... )
        >>> headers = {"Content-Type": "application/json"}
        >>> signed_headers = get_adcp_signed_headers_for_webhook(
        ...     headers, secret="my-webhook-secret", timestamp=str(int(time.time())),
        ...     payload=payload,
        ... )
        >>>
        >>> # Send webhook with signed headers
        >>> import httpx
        >>> response = await httpx.post(
        ...     webhook_url,
        ...     json=payload,
        ...     headers=signed_headers
        ... )

        Headers will contain:
        >>> print(signed_headers)
        {
            "Content-Type": "application/json",
            "X-AdCP-Signature": "sha256=a1b2c3...",
            "X-AdCP-Timestamp": "1773185740"
        }
    """
    signature_headers, _body_bytes = _compute_legacy_signature(
        secret=secret, timestamp=timestamp, payload=payload
    )
    headers.update(signature_headers)
    return headers

Generate AdCP-compliant signed headers for webhook delivery.

This function creates a cryptographic signature that proves the webhook came from an authorized agent and protects against replay attacks by including a timestamp in the signed message.

The function adds two headers to the provided headers dict: - X-AdCP-Signature: HMAC-SHA256 signature in format "sha256=" - X-AdCP-Timestamp: Unix timestamp in seconds

The signing algorithm: 1. Constructs message as "{timestamp}.{json_payload}" 2. JSON-serializes payload with default separators (matches wire format from json= kwarg) 3. UTF-8 encodes the message 4. HMAC-SHA256 signs with the shared secret 5. Hex-encodes and prefixes with "sha256="

Args

headers
Existing headers dictionary to add signature headers to
secret
Shared secret key for HMAC signing
timestamp
Unix timestamp in seconds (str or int). If None, uses current time.
payload
Webhook payload (dict or Pydantic model - will be JSON-serialized)

Returns

The modified headers dictionary with signature headers added

Examples

Sign and send an MCP webhook:

>>> import time
>>> from adcp.webhooks import create_mcp_webhook_payload
>>> from adcp.webhooks import get_adcp_signed_headers_for_webhook
>>>
>>> payload = create_mcp_webhook_payload(
...     task_id="task_123",
...     task_type="get_products",
...     status="completed",
...     result={"products": [...]}
... )
>>> headers = {"Content-Type": "application/json"}
>>> signed_headers = get_adcp_signed_headers_for_webhook(
...     headers, secret="my-webhook-secret", timestamp=str(int(time.time())),
...     payload=payload,
... )
>>>
>>> # Send webhook with signed headers
>>> import httpx
>>> response = await httpx.post(
...     webhook_url,
...     json=payload,
...     headers=signed_headers
... )

Headers will contain:

>>> print(signed_headers)
{
    "Content-Type": "application/json",
    "X-AdCP-Signature": "sha256=a1b2c3...",
    "X-AdCP-Timestamp": "1773185740"
}
def sign_legacy_webhook(secret: str,
payload: dict[str, Any] | AdCPBaseModel,
*,
timestamp: str | int | None = None,
headers: dict[str, Any] | None = None) ‑> tuple[dict[str, str], bytes]
Expand source code
def sign_legacy_webhook(
    secret: str,
    payload: dict[str, Any] | AdCPBaseModel,
    *,
    timestamp: str | int | None = None,
    headers: dict[str, Any] | None = None,
) -> tuple[dict[str, str], bytes]:
    """Return ``(signed_headers, body_bytes)`` for a legacy HMAC webhook.

    Byte-equality between signature input and HTTP body is guaranteed —
    callers POST ``content=body_bytes`` instead of ``json=payload``, so the
    separator-drift trap that caused silent 401s in every spaced-vs-compact
    interop is structurally impossible here.

    This is a lower-level companion to :func:`deliver` for callers who need
    to own the HTTP transport themselves (custom auth, pre-configured
    ``httpx.AsyncClient``, non-httpx clients). For the one-shot "send a
    webhook" path, prefer :func:`deliver`.

    The returned ``body_bytes`` use compact separators (``","``/``":"``)
    matching the canonical on-wire form pinned by adcontextprotocol/adcp#2478.

    Example:
        >>> signed, body = sign_legacy_webhook("shared-secret", payload)
        >>> headers = {**signed, "Content-Type": "application/json"}
        >>> await client.post(url, content=body, headers=headers)
    """
    signature_headers, body_bytes = _compute_legacy_signature(
        secret=secret, timestamp=timestamp, payload=payload
    )
    if headers is not None:
        merged = {str(k): str(v) for k, v in headers.items()}
        merged.update(signature_headers)
        return merged, body_bytes
    return signature_headers, body_bytes

Return (signed_headers, body_bytes) for a legacy HMAC webhook.

Byte-equality between signature input and HTTP body is guaranteed — callers POST content=body_bytes instead of json=payload, so the separator-drift trap that caused silent 401s in every spaced-vs-compact interop is structurally impossible here.

This is a lower-level companion to :func:deliver() for callers who need to own the HTTP transport themselves (custom auth, pre-configured httpx.AsyncClient, non-httpx clients). For the one-shot "send a webhook" path, prefer :func:deliver().

The returned body_bytes use compact separators (","/":") matching the canonical on-wire form pinned by adcontextprotocol/adcp#2478.

Example

>>> signed, body = sign_legacy_webhook("shared-secret", payload)
>>> headers = {**signed, "Content-Type": "application/json"}
>>> await client.post(url, content=body, headers=headers)
def sign_webhook(*,
method: str,
url: str,
headers: Mapping[str, str],
body: bytes,
private_key: PrivateKey,
key_id: str,
alg: str,
created: int | None = None,
expires_in_seconds: int = 300,
nonce: str | None = None,
label: str = 'sig1') ‑> SignedHeaders
Expand source code
def sign_webhook(
    *,
    method: str,
    url: str,
    headers: Mapping[str, str],
    body: bytes,
    private_key: PrivateKey,
    key_id: str,
    alg: str,
    created: int | None = None,
    expires_in_seconds: int = DEFAULT_EXPIRES_IN_SECONDS,
    nonce: str | None = None,
    label: str = SIG_LABEL_DEFAULT,
) -> SignedHeaders:
    """Sign an outgoing webhook POST per adcp/webhook-signing/v1.

    ``cover_content_digest=True`` and ``tag=WEBHOOK_TAG`` are pinned. The
    caller attaches ``SignedHeaders.as_dict()`` to the outgoing HTTP request.

    The ``method`` is normally ``"POST"`` for webhook delivery; passed through
    unchanged so callers signing a retried ``PUT`` or variant delivery verb
    are not forced into an extra translation.

    See also:
        :class:`adcp.webhooks.WebhookSender` — higher-level one-call helper
        that builds the payload, signs, and POSTs in a single call. Prefer it
        unless you need to own the HTTP transport yourself.
    """
    return sign_request(
        method=method,
        url=url,
        headers=headers,
        body=body,
        private_key=private_key,
        key_id=key_id,
        alg=alg,
        cover_content_digest=True,
        created=created,
        expires_in_seconds=expires_in_seconds,
        nonce=nonce,
        tag=WEBHOOK_TAG,
        label=label,
    )

Sign an outgoing webhook POST per adcp/webhook-signing/v1.

cover_content_digest=True and tag=WEBHOOK_TAG are pinned. The caller attaches SignedHeaders.as_dict() to the outgoing HTTP request.

The method is normally "POST" for webhook delivery; passed through unchanged so callers signing a retried PUT or variant delivery verb are not forced into an extra translation.

See also: :class:WebhookSender — higher-level one-call helper that builds the payload, signs, and POSTs in a single call. Prefer it unless you need to own the HTTP transport yourself.

def verify_webhook_hmac(*,
headers: Mapping[str, str],
body: bytes,
options: LegacyWebhookHmacOptions) ‑> VerifiedLegacyWebhookSender
Expand source code
def verify_webhook_hmac(
    *,
    headers: Mapping[str, str],
    body: bytes,
    options: LegacyWebhookHmacOptions,
) -> VerifiedLegacyWebhookSender:
    """Verify an HMAC-SHA256-signed webhook body per the legacy scheme.

    Raises :class:`LegacyWebhookHmacError` on any failure. Fires a one-time
    :class:`DeprecationWarning` — operators SHOULD migrate to 9421 before AdCP
    4.0 removes the ``authentication`` field.

    ``headers`` can be any ``Mapping[str, str]`` — ``dict``,
    ``werkzeug.datastructures.EnvironHeaders``, Starlette's ``Headers``, etc.
    Keys are case-folded internally.
    """
    _warn_once()

    header_map = {str(k).lower(): str(v) for k, v in headers.items()}
    sig_value = header_map.get(_SIGNATURE_HEADER)
    ts_value = header_map.get(_TIMESTAMP_HEADER)
    if sig_value is None or ts_value is None:
        raise LegacyWebhookHmacError("missing X-AdCP-Signature or X-AdCP-Timestamp header")
    if not sig_value.startswith(_HEX_PREFIX):
        raise LegacyWebhookHmacError(
            f"signature must start with {_HEX_PREFIX!r}, got {sig_value[:16]!r}"
        )
    hex_sig = sig_value[len(_HEX_PREFIX) :]

    try:
        ts_int = int(ts_value)
    except ValueError as exc:
        raise LegacyWebhookHmacError(f"invalid timestamp {ts_value!r}") from exc

    # Bound on the skew window. Matches the 9421 max window (300s) exactly —
    # the 9421 pipeline applies DEFAULT_SKEW_SECONDS inside its window check,
    # so both schemes have the same "skew budget" on the wire. Do NOT add
    # DEFAULT_SKEW_SECONDS on top; that would double-count and yield a 360s
    # budget for HMAC vs 300s for 9421.
    skew = abs(options.now - ts_int)
    if skew > options.window_seconds:
        raise LegacyWebhookHmacError(
            f"timestamp skew {skew:.0f}s exceeds window {options.window_seconds}s"
        )

    # The sender constructs the message as f"{timestamp}.{json_payload}"
    # where json_payload is the body bytes as UTF-8. Re-decoding a dict would
    # re-serialize with potentially different key order and break the
    # signature — verify against the raw bytes as received.
    message = f"{ts_value}.".encode() + body
    expected = hmac.new(options.secret, message, hashlib.sha256).hexdigest()
    # Constant-time compare — hmac.compare_digest handles str/str.
    if not hmac.compare_digest(expected, hex_sig):
        raise LegacyWebhookHmacError("signature did not match")

    return VerifiedLegacyWebhookSender(sender_identity=options.sender_identity)

Verify an HMAC-SHA256-signed webhook body per the legacy scheme.

Raises :class:LegacyWebhookHmacError on any failure. Fires a one-time :class:DeprecationWarning — operators SHOULD migrate to 9421 before AdCP 4.0 removes the authentication field.

headers can be any Mapping[str, str]dict, werkzeug.datastructures.EnvironHeaders, Starlette's Headers, etc. Keys are case-folded internally.

def verify_webhook_signature(*,
method: str,
url: str,
headers: Mapping[str, str],
body: bytes,
options: WebhookVerifyOptions) ‑> VerifiedWebhookSender
Expand source code
def verify_webhook_signature(
    *,
    method: str,
    url: str,
    headers: Mapping[str, str],
    body: bytes,
    options: WebhookVerifyOptions,
) -> VerifiedWebhookSender:
    """Verify an incoming signed webhook per the adcp/webhook-signing/v1 profile.

    Raises :class:`SignatureVerificationError` with a ``webhook_signature_*``
    code on failure. Success returns a :class:`VerifiedWebhookSender` carrying
    the identity to scope dedup state by.
    """
    _precheck_webhook_has_required_components(headers)

    request_options = VerifyOptions(
        now=options.clock(),
        capability=VerifierCapability(
            supported=True,
            covers_content_digest="required",
            required_for=frozenset({"webhook"}),
        ),
        operation="webhook",
        jwks_resolver=options.jwks_resolver,
        replay_store=options.replay_store,
        revocation_checker=options.revocation_checker,
        revocation_list=options.revocation_list,
        max_skew_seconds=options.max_skew_seconds,
        max_window_seconds=options.max_window_seconds,
        label=options.label,
        expected_tag=WEBHOOK_TAG,
        expected_adcp_use=ADCP_USE_WEBHOOK,
        allowed_algs=options.allowed_algs,
        agent_url=options.sender_url,
    )

    try:
        signer: VerifiedSigner = verify_request_signature(
            method=method, url=url, headers=headers, body=body, options=request_options
        )
    except SignatureVerificationError as exc:
        raise _retag_to_webhook(exc) from exc

    return VerifiedWebhookSender(
        key_id=signer.key_id,
        alg=signer.alg,
        label=signer.label,
        verified_at=signer.verified_at,
        sender_url=signer.agent_url,
    )

Verify an incoming signed webhook per the adcp/webhook-signing/v1 profile.

Raises :class:SignatureVerificationError with a webhook_signature_* code on failure. Success returns a :class:VerifiedWebhookSender carrying the identity to scope dedup state by.

Classes

class LegacyHmacFallback (options_for: Callable[[Mapping[str, str]], LegacyWebhookHmacOptions | None],
only_when_9421_absent: bool = True)
Expand source code
@dataclass(frozen=True)
class LegacyHmacFallback:
    """Opt-in policy for accepting HMAC-SHA256 senders during 3.x migration.

    The default behavior of the receiver is to reject any request that fails
    9421 verification. Pass an instance of this class to ``WebhookReceiverConfig``
    to accept HMAC-signed webhooks as a fallback.

    :param options_for: callback that returns a populated
        :class:`LegacyWebhookHmacOptions` given the incoming request headers.
        Your implementation resolves the sender (from Bearer, hostname, or
        legacy shared-secret tag) and returns the secret + sender_identity
        tuple the verifier needs. Return ``None`` to decline the fallback
        for this request (rejection follows the 9421-only failure path).
    :param only_when_9421_absent: when ``True`` (default), HMAC fallback only
        fires when no 9421 headers are present at all. When a request carries
        9421 headers that FAIL verification, it still rejects — preventing a
        downgrade attack where a MITM strips the 9421 signature and replaces
        it with a forged HMAC one it knows the secret for. When ``False``,
        HMAC is tried on any 9421 failure; only set this for testing or known
        homogenous sender cohorts.
    """

    options_for: Callable[[Mapping[str, str]], LegacyWebhookHmacOptions | None]
    only_when_9421_absent: bool = True

    @classmethod
    def from_shared_secret(
        cls,
        *,
        secret: bytes,
        sender_identity: str,
        only_when_9421_absent: bool = True,
        window_seconds: int = 300,
    ) -> LegacyHmacFallback:
        """Convenience constructor for the "one secret, one sender" case.

        Covers the common 3.x migration setup where the receiver has exactly
        one publisher on the legacy scheme and binds them to a known
        ``sender_identity`` (typically a buyer-defined string). For multi-
        sender or header-derived-identity setups, construct with an
        ``options_for`` callback directly.
        """
        import time as _time

        def _options_for(_headers: Mapping[str, str]) -> LegacyWebhookHmacOptions:
            return LegacyWebhookHmacOptions(
                secret=secret,
                sender_identity=sender_identity,
                now=_time.time(),
                window_seconds=window_seconds,
            )

        return cls(
            options_for=_options_for,
            only_when_9421_absent=only_when_9421_absent,
        )

Opt-in policy for accepting HMAC-SHA256 senders during 3.x migration.

The default behavior of the receiver is to reject any request that fails 9421 verification. Pass an instance of this class to WebhookReceiverConfig to accept HMAC-signed webhooks as a fallback.

:param options_for: callback that returns a populated :class:LegacyWebhookHmacOptions given the incoming request headers. Your implementation resolves the sender (from Bearer, hostname, or legacy shared-secret tag) and returns the secret + sender_identity tuple the verifier needs. Return None to decline the fallback for this request (rejection follows the 9421-only failure path). :param only_when_9421_absent: when True (default), HMAC fallback only fires when no 9421 headers are present at all. When a request carries 9421 headers that FAIL verification, it still rejects — preventing a downgrade attack where a MITM strips the 9421 signature and replaces it with a forged HMAC one it knows the secret for. When False, HMAC is tried on any 9421 failure; only set this for testing or known homogenous sender cohorts.

Static methods

def from_shared_secret(*,
secret: bytes,
sender_identity: str,
only_when_9421_absent: bool = True,
window_seconds: int = 300) ‑> LegacyHmacFallback

Convenience constructor for the "one secret, one sender" case.

Covers the common 3.x migration setup where the receiver has exactly one publisher on the legacy scheme and binds them to a known sender_identity (typically a buyer-defined string). For multi- sender or header-derived-identity setups, construct with an options_for callback directly.

Instance variables

var only_when_9421_absent : bool
var options_for : Callable[[Mapping[str, str]], LegacyWebhookHmacOptions | None]
class LegacyWebhookHmacError (reason: str)
Expand source code
class LegacyWebhookHmacError(Exception):
    """Raised when HMAC-SHA256 legacy verification fails.

    Distinct from :class:`adcp.signing.errors.SignatureVerificationError` so
    callers can distinguish legacy-path failures from 9421-path failures —
    operators want to know which scheme fired when diagnosing a 401.
    """

    def __init__(self, reason: str) -> None:
        super().__init__(reason)
        self.reason = reason

Raised when HMAC-SHA256 legacy verification fails.

Distinct from :class:SignatureVerificationError so callers can distinguish legacy-path failures from 9421-path failures — operators want to know which scheme fired when diagnosing a 401.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class LegacyWebhookHmacOptions (secret: bytes, sender_identity: str, now: float, window_seconds: int = 300)
Expand source code
@dataclass(frozen=True)
class LegacyWebhookHmacOptions:
    """Options for the HMAC verifier.

    :param secret_resolver: callable ``(header_map) -> bytes | None`` that
        returns the shared secret for this incoming request. The receiver is
        responsible for determining sender identity from headers (Bearer
        token, IP allowlist, hostname) and looking up the secret bound to
        that sender. The resolver returns ``None`` when no sender can be
        authenticated — the verifier then rejects without attempting compare.
    :param sender_identity: the string used to scope dedup after verification
        succeeds. In HMAC-legacy, there is no cryptographic sender identity
        (the secret IS the identity), so the caller provides one — typically
        derived from the same lookup that produced the secret.
    :param now: current time, epoch seconds. Defaults fetched at call time.
    :param window_seconds: accepted skew. Sender timestamp outside ``[now -
        window, now + window]`` rejects.
    """

    secret: bytes
    sender_identity: str
    now: float
    window_seconds: int = _DEFAULT_WINDOW_SECONDS

Options for the HMAC verifier.

:param secret_resolver: callable (header_map) -> bytes | None that returns the shared secret for this incoming request. The receiver is responsible for determining sender identity from headers (Bearer token, IP allowlist, hostname) and looking up the secret bound to that sender. The resolver returns None when no sender can be authenticated — the verifier then rejects without attempting compare. :param sender_identity: the string used to scope dedup after verification succeeds. In HMAC-legacy, there is no cryptographic sender identity (the secret IS the identity), so the caller provides one — typically derived from the same lookup that produced the secret. :param now: current time, epoch seconds. Defaults fetched at call time. :param window_seconds: accepted skew. Sender timestamp outside [now - window, now + window] rejects.

Instance variables

var now : float
var secret : bytes
var sender_identity : str
var window_seconds : int
class MemoryBackend (*, clock: Callable[[], float] = <built-in function time>)
Expand source code
class MemoryBackend(IdempotencyBackend):
    """In-process dict-backed store.

    Suitable for tests, single-process reference implementations, and local
    development. **Not suitable for multi-process deployments** — each worker
    has its own cache, so a retry that lands on a different worker is treated
    as a fresh request.

    Thread safety: the backend uses an :class:`asyncio.Lock` to serialize
    mutations of the shared dict. Reads go through the lock too; for a pure
    in-process backend this is cheap and prevents torn reads across concurrent
    ``get``/``put`` interleaving.

    :param clock: Callable returning the current epoch seconds. Override for
        tests that need to advance time deterministically without monkeypatching
        :mod:`time`. Defaults to :func:`time.time`.
    """

    def __init__(self, *, clock: Callable[[], float] = time.time) -> None:
        self._store: dict[tuple[str, str], CachedResponse] = {}
        self._lock = asyncio.Lock()
        self._clock = clock

    async def get(
        self, principal_id: str, key: str
    ) -> CachedResponse | None:
        async with self._lock:
            entry = self._store.get((principal_id, key))
            if entry is None:
                return None
            if entry.expires_at_epoch <= self._clock():
                # Lazy expiry — drop the stale entry so the next request
                # treats the slot as fresh and races to repopulate.
                del self._store[(principal_id, key)]
                return None
            return entry

    async def put(
        self,
        principal_id: str,
        key: str,
        entry: CachedResponse,
    ) -> None:
        async with self._lock:
            self._store[(principal_id, key)] = entry

    async def delete_expired(self, now_epoch: float | None = None) -> int:
        cutoff = now_epoch if now_epoch is not None else self._clock()
        async with self._lock:
            stale = [k for k, v in self._store.items() if v.expires_at_epoch <= cutoff]
            for k in stale:
                del self._store[k]
            return len(stale)

    async def clear(self) -> None:
        """Remove all cached entries.

        Test-suite hook — handy for resetting state between fixtures when a
        single :class:`MemoryBackend` is shared across multiple tests.
        """
        async with self._lock:
            self._store.clear()

    async def _size(self) -> int:
        """Test-only: return the current entry count."""
        async with self._lock:
            return len(self._store)

In-process dict-backed store.

Suitable for tests, single-process reference implementations, and local development. Not suitable for multi-process deployments — each worker has its own cache, so a retry that lands on a different worker is treated as a fresh request.

Thread safety: the backend uses an :class:asyncio.Lock to serialize mutations of the shared dict. Reads go through the lock too; for a pure in-process backend this is cheap and prevents torn reads across concurrent get/put interleaving.

:param clock: Callable returning the current epoch seconds. Override for tests that need to advance time deterministically without monkeypatching :mod:time. Defaults to :func:time.time.

Ancestors

Methods

async def clear(self) ‑> None
Expand source code
async def clear(self) -> None:
    """Remove all cached entries.

    Test-suite hook — handy for resetting state between fixtures when a
    single :class:`MemoryBackend` is shared across multiple tests.
    """
    async with self._lock:
        self._store.clear()

Remove all cached entries.

Test-suite hook — handy for resetting state between fixtures when a single :class:MemoryBackend is shared across multiple tests.

Inherited members

class VerifiedLegacyWebhookSender (sender_identity: str)
Expand source code
@dataclass(frozen=True)
class VerifiedLegacyWebhookSender:
    """Identity returned by the HMAC verifier on success.

    Shape-compatible with the 9421 ``VerifiedWebhookSender.as_sender_identity``
    so downstream dedup code treats both the same.
    """

    sender_identity: str

    def as_sender_identity(self) -> str:
        return self.sender_identity

Identity returned by the HMAC verifier on success.

Shape-compatible with the 9421 VerifiedWebhookSender.as_sender_identity() so downstream dedup code treats both the same.

Instance variables

var sender_identity : str

Methods

def as_sender_identity(self) ‑> str
Expand source code
def as_sender_identity(self) -> str:
    return self.sender_identity
class VerifiedSignerLike (*args, **kwargs)
Expand source code
@runtime_checkable
class VerifiedSignerLike(Protocol):
    """Anything with ``as_sender_identity() -> str``.

    Both :class:`VerifiedWebhookSender` (9421) and :class:`VerifiedLegacyWebhookSender`
    (HMAC) implement this shape, so the receiver treats both verification
    paths identically downstream.
    """

    def as_sender_identity(self) -> str: ...

Anything with as_sender_identity() -> str.

Both :class:VerifiedWebhookSender (9421) and :class:VerifiedLegacyWebhookSender (HMAC) implement this shape, so the receiver treats both verification paths identically downstream.

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

def as_sender_identity(self) ‑> str
Expand source code
def as_sender_identity(self) -> str: ...
class VerifiedWebhookSender (key_id: str,
alg: str,
label: str,
verified_at: float,
sender_url: str | None = None)
Expand source code
@dataclass(frozen=True)
class VerifiedWebhookSender:
    """Returned on successful webhook verification.

    Distinct type from :class:`VerifiedSigner` so a caller that mistakenly
    passes a request-verified signer into a webhook-scoped dedup store (or the
    reverse) will fail to type-check. Both carry the same bytes; the type
    separation is a guardrail, not a data difference.
    """

    key_id: str
    alg: str
    label: str
    verified_at: float
    sender_url: str | None = None

    def as_sender_identity(self) -> str:
        """Identity string used to scope dedup state.

        Webhook dedup MUST be scoped to the authenticated sender — trusting a
        payload field for identity is the attack-surface hole the spec's
        "Sender requirements" paragraph calls out. The key_id is the
        cryptographically verified identity; prefer ``sender_url:key_id`` when
        a sender URL is present to tolerate JWKS reuse across co-deployed
        senders.
        """
        if self.sender_url is not None:
            return f"{self.sender_url}|{self.key_id}"
        return self.key_id

Returned on successful webhook verification.

Distinct type from :class:VerifiedSigner so a caller that mistakenly passes a request-verified signer into a webhook-scoped dedup store (or the reverse) will fail to type-check. Both carry the same bytes; the type separation is a guardrail, not a data difference.

Instance variables

var alg : str
var key_id : str
var label : str
var sender_url : str | None
var verified_at : float

Methods

def as_sender_identity(self) ‑> str
Expand source code
def as_sender_identity(self) -> str:
    """Identity string used to scope dedup state.

    Webhook dedup MUST be scoped to the authenticated sender — trusting a
    payload field for identity is the attack-surface hole the spec's
    "Sender requirements" paragraph calls out. The key_id is the
    cryptographically verified identity; prefer ``sender_url:key_id`` when
    a sender URL is present to tolerate JWKS reuse across co-deployed
    senders.
    """
    if self.sender_url is not None:
        return f"{self.sender_url}|{self.key_id}"
    return self.key_id

Identity string used to scope dedup state.

Webhook dedup MUST be scoped to the authenticated sender — trusting a payload field for identity is the attack-surface hole the spec's "Sender requirements" paragraph calls out. The key_id is the cryptographically verified identity; prefer sender_url:key_id when a sender URL is present to tolerate JWKS reuse across co-deployed senders.

class WebhookDedupStore (backend: IdempotencyBackend,
ttl_seconds: int = 86400,
*,
namespace: str = 'webhook',
clock: Callable[[], float] = <built-in function time>)
Expand source code
class WebhookDedupStore:
    """Dedup ``(sender_id, idempotency_key)`` pairs to suppress retried webhooks.

    :param backend: any :class:`IdempotencyBackend`. Same MemoryBackend or
        PgBackend type used by :class:`IdempotencyStore` is fine — the
        ``namespace`` parameter prefixes all sender IDs so request-side and
        webhook-side scopes can't alias even when sharing one backend instance.
    :param ttl_seconds: replay window. Must be within ``[86400, 604800]`` per
        the spec minimum. Defaults to 86400 (24h).
    :param namespace: prefix applied to every ``sender_id`` before it hits
        the backend. Defaults to ``"webhook"``, which is safe when the same
        backend is shared with :class:`IdempotencyStore` (request-side keys
        are scoped by a principal_id that isn't wrapped in this namespace,
        so collisions are impossible). Override only if you run multiple
        webhook scopes against one backend (e.g., separate dedup spaces for
        task webhooks vs list-change webhooks).
    """

    def __init__(
        self,
        backend: IdempotencyBackend,
        ttl_seconds: int = _MIN_TTL_SECONDS,
        *,
        namespace: str = "webhook",
        clock: Callable[[], float] = time.time,
    ) -> None:
        if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS:
            raise ValueError(
                f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] "
                f"per webhook spec minimum, got {ttl_seconds}"
            )
        if not namespace:
            raise ValueError("namespace must be a non-empty string")
        self.backend = backend
        self.ttl_seconds = ttl_seconds
        self.namespace = namespace
        self._clock = clock

    async def check_and_record(self, sender_id: str, idempotency_key: str) -> bool:
        """Atomically check for first-seen and record if new.

        Returns ``True`` when the pair is first-seen (event should be
        processed), ``False`` on duplicate (caller MUST still return 2xx to
        the sender — the event was delivered successfully, it's just a retry).

        Race note: the check-then-put pattern is not atomic across concurrent
        callers unless the backend provides its own atomicity. MemoryBackend
        serializes individual ``get`` and ``put`` under an ``asyncio.Lock`` but
        does NOT bracket them together — two concurrent retries of the same
        event CAN both observe "first-seen" and both process the event. That's
        a tolerable failure mode: the ultimate guarantee is "at most once per
        replay window in the common case"; a concurrent retry arriving in the
        same few milliseconds is rare and, if it happens, produces the same
        "duplicated side effect" outcome the at-least-once contract already
        warns callers to tolerate. PgBackend implementations SHOULD use
        ``INSERT ... ON CONFLICT DO NOTHING`` returning ``rowcount`` for
        lock-free atomicity.
        """
        if not sender_id:
            raise ValueError("sender_id must be a non-empty string")
        if not idempotency_key:
            raise ValueError("idempotency_key must be a non-empty string")

        scoped_sender = f"{self.namespace}:{sender_id}"
        existing = await self.backend.get(scoped_sender, idempotency_key)
        if existing is not None:
            logger.debug(
                "webhook dedup: duplicate sender=%s key_prefix=%s",
                sender_id,
                idempotency_key[:8],
            )
            return False

        entry = CachedResponse(
            payload_hash=_SENTINEL_HASH,
            response={},
            expires_at_epoch=self._clock() + self.ttl_seconds,
        )
        try:
            await self.backend.put(scoped_sender, idempotency_key, entry)
        except Exception:
            # Same fail-open reasoning as the request-side store: log and
            # process. Swallowing the put failure means this event MIGHT
            # reprocess on retry, not that we drop it. Better than raising,
            # which would look like handler failure to the sender.
            logger.warning(
                "webhook dedup put failed for sender=%s key_prefix=%s — "
                "event processed but next retry will reprocess",
                sender_id,
                idempotency_key[:8],
                exc_info=True,
            )
        return True

Dedup (sender_id, idempotency_key) pairs to suppress retried webhooks.

:param backend: any :class:IdempotencyBackend. Same MemoryBackend or PgBackend type used by :class:IdempotencyStore is fine — the namespace parameter prefixes all sender IDs so request-side and webhook-side scopes can't alias even when sharing one backend instance. :param ttl_seconds: replay window. Must be within [86400, 604800] per the spec minimum. Defaults to 86400 (24h). :param namespace: prefix applied to every sender_id before it hits the backend. Defaults to "webhook", which is safe when the same backend is shared with :class:IdempotencyStore (request-side keys are scoped by a principal_id that isn't wrapped in this namespace, so collisions are impossible). Override only if you run multiple webhook scopes against one backend (e.g., separate dedup spaces for task webhooks vs list-change webhooks).

Methods

async def check_and_record(self, sender_id: str, idempotency_key: str) ‑> bool
Expand source code
async def check_and_record(self, sender_id: str, idempotency_key: str) -> bool:
    """Atomically check for first-seen and record if new.

    Returns ``True`` when the pair is first-seen (event should be
    processed), ``False`` on duplicate (caller MUST still return 2xx to
    the sender — the event was delivered successfully, it's just a retry).

    Race note: the check-then-put pattern is not atomic across concurrent
    callers unless the backend provides its own atomicity. MemoryBackend
    serializes individual ``get`` and ``put`` under an ``asyncio.Lock`` but
    does NOT bracket them together — two concurrent retries of the same
    event CAN both observe "first-seen" and both process the event. That's
    a tolerable failure mode: the ultimate guarantee is "at most once per
    replay window in the common case"; a concurrent retry arriving in the
    same few milliseconds is rare and, if it happens, produces the same
    "duplicated side effect" outcome the at-least-once contract already
    warns callers to tolerate. PgBackend implementations SHOULD use
    ``INSERT ... ON CONFLICT DO NOTHING`` returning ``rowcount`` for
    lock-free atomicity.
    """
    if not sender_id:
        raise ValueError("sender_id must be a non-empty string")
    if not idempotency_key:
        raise ValueError("idempotency_key must be a non-empty string")

    scoped_sender = f"{self.namespace}:{sender_id}"
    existing = await self.backend.get(scoped_sender, idempotency_key)
    if existing is not None:
        logger.debug(
            "webhook dedup: duplicate sender=%s key_prefix=%s",
            sender_id,
            idempotency_key[:8],
        )
        return False

    entry = CachedResponse(
        payload_hash=_SENTINEL_HASH,
        response={},
        expires_at_epoch=self._clock() + self.ttl_seconds,
    )
    try:
        await self.backend.put(scoped_sender, idempotency_key, entry)
    except Exception:
        # Same fail-open reasoning as the request-side store: log and
        # process. Swallowing the put failure means this event MIGHT
        # reprocess on retry, not that we drop it. Better than raising,
        # which would look like handler failure to the sender.
        logger.warning(
            "webhook dedup put failed for sender=%s key_prefix=%s — "
            "event processed but next retry will reprocess",
            sender_id,
            idempotency_key[:8],
            exc_info=True,
        )
    return True

Atomically check for first-seen and record if new.

Returns True when the pair is first-seen (event should be processed), False on duplicate (caller MUST still return 2xx to the sender — the event was delivered successfully, it's just a retry).

Race note: the check-then-put pattern is not atomic across concurrent callers unless the backend provides its own atomicity. MemoryBackend serializes individual get and put under an asyncio.Lock but does NOT bracket them together — two concurrent retries of the same event CAN both observe "first-seen" and both process the event. That's a tolerable failure mode: the ultimate guarantee is "at most once per replay window in the common case"; a concurrent retry arriving in the same few milliseconds is rare and, if it happens, produces the same "duplicated side effect" outcome the at-least-once contract already warns callers to tolerate. PgBackend implementations SHOULD use INSERT … ON CONFLICT DO NOTHING returning rowcount for lock-free atomicity.

class WebhookDeliveryResult (status_code: int,
idempotency_key: str,
url: str,
response_headers: Mapping[str, str],
response_body: bytes,
sent_body: bytes = b'',
sent_extra_headers: Mapping[str, str] = <factory>)
Expand source code
@dataclass(frozen=True)
class WebhookDeliveryResult:
    """Outcome of one ``send_*`` call.

    Senders care about: did it land (``ok``), what key was used (for logs
    and retry), what did the receiver say (``status_code``, ``response_body``).

    The ``sent_body`` and ``sent_extra_headers`` fields capture exactly what
    was signed and POSTed — the sender's :meth:`WebhookSender.resend` replays
    them under a fresh signature (preserving ``idempotency_key`` for dedup)
    rather than re-serializing from a user-supplied dict, which would drift
    if any field (``timestamp``, nested ``result``) differs between calls.
    """

    status_code: int
    idempotency_key: str
    url: str
    response_headers: Mapping[str, str]
    response_body: bytes
    sent_body: bytes = b""
    sent_extra_headers: Mapping[str, str] = field(default_factory=dict)

    @property
    def ok(self) -> bool:
        """True on 2xx. Note: receivers MUST return 2xx on duplicates too, so
        a 200 with ``duplicate=true`` in the body is still ``ok``."""
        return 200 <= self.status_code < 300

Outcome of one send_* call.

Senders care about: did it land (ok), what key was used (for logs and retry), what did the receiver say (status_code, response_body).

The sent_body and sent_extra_headers fields capture exactly what was signed and POSTed — the sender's :meth:WebhookSender.resend() replays them under a fresh signature (preserving idempotency_key for dedup) rather than re-serializing from a user-supplied dict, which would drift if any field (timestamp, nested result) differs between calls.

Instance variables

var idempotency_key : str
prop ok : bool
Expand source code
@property
def ok(self) -> bool:
    """True on 2xx. Note: receivers MUST return 2xx on duplicates too, so
    a 200 with ``duplicate=true`` in the body is still ``ok``."""
    return 200 <= self.status_code < 300

True on 2xx. Note: receivers MUST return 2xx on duplicates too, so a 200 with duplicate=true in the body is still ok.

var response_body : bytes
var response_headers : Mapping[str, str]
var sent_body : bytes
var sent_extra_headers : Mapping[str, str]
var status_code : int
var url : str
class WebhookOutcome (rejected: bool = False,
rejection_reason: RejectionReason | None = None,
response_headers: Mapping[str, str] = <factory>,
sender_identity: str | None = None,
payload: WebhookPayload | None = None,
duplicate: bool = False,
idempotency_key: str | None = None)
Expand source code
@dataclass(frozen=True)
class WebhookOutcome:
    """Result of a single ``receive`` call.

    Exactly one of ``rejected`` or ``payload`` is set. ``duplicate=True`` is
    compatible with a non-None ``payload`` — the payload parsed fine, the
    signature verified fine, it's just a retry the caller should 200 away.
    """

    rejected: bool = False
    rejection_reason: RejectionReason | None = None
    response_headers: Mapping[str, str] = field(default_factory=dict)
    # Populated on successful verify (even when rejected downstream of crypto)
    sender_identity: str | None = None
    # Populated on successful verify + parse
    payload: WebhookPayload | None = None
    duplicate: bool = False
    idempotency_key: str | None = None

Result of a single receive call.

Exactly one of rejected or payload is set. duplicate=True is compatible with a non-None payload — the payload parsed fine, the signature verified fine, it's just a retry the caller should 200 away.

Instance variables

var duplicate : bool
var idempotency_key : str | None
var payloadMcpWebhookPayload | RevocationNotification | CollectionListChangedWebhook | PropertyListChangedWebhook | ArtifactWebhookPayload | None
var rejected : bool
var rejection_reason : Literal['signature_missing', 'signature_invalid', 'signature_legacy_failed', 'content_type_invalid', 'body_invalid_json', 'payload_invalid', 'idempotency_key_missing', 'idempotency_key_invalid'] | None
var response_headers : Mapping[str, str]
var sender_identity : str | None
class WebhookReceiver (config: WebhookReceiverConfig)
Expand source code
class WebhookReceiver:
    """Stateless webhook entry point, one instance per receiver configuration.

    Instance state (``config``) is read-only after construction. Per-request
    state lives in the :class:`WebhookOutcome` returned from :meth:`receive`.
    """

    def __init__(self, config: WebhookReceiverConfig) -> None:
        self._config = config

    async def receive(
        self,
        *,
        method: str,
        url: str,
        headers: Mapping[str, str],
        body: bytes,
    ) -> WebhookOutcome:
        """Verify, dedupe, parse. Returns a :class:`WebhookOutcome`.

        Never raises for sender-caused cryptographic or protocol failures —
        returns an outcome with ``rejected=True`` and populated
        ``response_headers`` so the caller can convert to an HTTP response
        without try/except around every call. Operational failures inside
        the dedup backend or verify-options factory MAY still raise; wrap
        the call if you need to 5xx cleanly on internal errors.
        """
        if not _content_type_is_json(headers):
            return _reject("content_type_invalid", sender_identity=None)

        signer, rejection = await self._verify(method=method, url=url, headers=headers, body=body)
        if rejection is not None:
            return rejection
        assert signer is not None  # verification succeeded

        sender_id = signer.as_sender_identity()

        try:
            payload_dict = json.loads(body)
        except json.JSONDecodeError:
            return _reject("body_invalid_json", sender_identity=sender_id)
        if not isinstance(payload_dict, dict):
            return _reject("body_invalid_json", sender_identity=sender_id)

        idempotency_key = payload_dict.get("idempotency_key")
        if not isinstance(idempotency_key, str) or not idempotency_key:
            # Spec 3.0-rc: idempotency_key is REQUIRED on every webhook payload.
            return _reject("idempotency_key_missing", sender_identity=sender_id)
        if not _IDEMPOTENCY_KEY_RE.match(idempotency_key):
            # Non-conformant format — charset or length out of bounds.
            return _reject("idempotency_key_invalid", sender_identity=sender_id)

        parsed = self._parse(payload_dict)
        if parsed is None:
            return _reject("payload_invalid", sender_identity=sender_id)

        is_first_seen = await self._config.dedup.check_and_record(
            sender_id=sender_id, idempotency_key=idempotency_key
        )

        return WebhookOutcome(
            sender_identity=sender_id,
            payload=parsed,
            duplicate=not is_first_seen,
            idempotency_key=idempotency_key,
        )

    def receive_sync(
        self,
        *,
        method: str,
        url: str,
        headers: Mapping[str, str],
        body: bytes,
    ) -> WebhookOutcome:
        """Synchronous wrapper around :meth:`receive` for WSGI-style frameworks.

        Use this from Flask, Gunicorn sync workers, ``http.server``, or any
        other sync-only HTTP entry point where wrapping every call in
        ``asyncio.run(...)`` is just noise::

            @app.post("/webhooks/adcp")
            def hook():
                outcome = receiver.receive_sync(
                    method=request.method,
                    url=request.url,
                    headers=dict(request.headers),
                    body=request.get_data(),
                )
                ...

        Raises :class:`RuntimeError` if invoked from a thread that already has
        a running event loop — the underlying verify / dedup path is async and
        cannot be driven from inside an active loop without blocking it. From
        async code, call :meth:`receive` directly.
        """
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            # No running loop in this thread — safe to spin one up.
            return asyncio.run(self.receive(method=method, url=url, headers=headers, body=body))
        raise RuntimeError(
            "WebhookReceiver.receive_sync() cannot be called from a running "
            "event loop. Use `await receiver.receive(...)` instead."
        )

    async def _verify(
        self,
        *,
        method: str,
        url: str,
        headers: Mapping[str, str],
        body: bytes,
    ) -> tuple[VerifiedSignerLike | None, WebhookOutcome | None]:
        """Returns (signer, None) on success or (None, rejection_outcome)."""
        has_9421 = _has_9421_headers(headers)

        if has_9421:
            try:
                signer = verify_webhook_signature(
                    method=method,
                    url=url,
                    headers=headers,
                    body=body,
                    options=self._config.verify_options,
                )
                return signer, None
            except SignatureVerificationError as exc:
                # Downgrade defense: when 9421 IS present but fails, do NOT
                # consult HMAC fallback by default. A MITM that stripped a
                # valid 9421 signature and replaced it with a forged HMAC one
                # is exactly what the downgrade guard exists for.
                fallback = self._config.legacy_hmac
                allow_hmac = fallback is not None and not fallback.only_when_9421_absent
                if not allow_hmac:
                    return None, WebhookOutcome(
                        rejected=True,
                        rejection_reason="signature_invalid",
                        response_headers=_www_authenticate_header(exc.code),
                    )
                logger.warning(
                    "9421 webhook verify failed (%s); trying HMAC legacy because "
                    "legacy_hmac.only_when_9421_absent=False is set",
                    exc.code,
                )

        fallback = self._config.legacy_hmac
        if fallback is None:
            # No 9421 headers AND no HMAC fallback configured → spec says 9421
            # is baseline-required in 3.0, so this is non-conformant.
            return None, WebhookOutcome(
                rejected=True,
                rejection_reason="signature_missing",
                response_headers=_www_authenticate_header("webhook_signature_required"),
            )

        hmac_options = fallback.options_for(headers)
        if hmac_options is None:
            return None, WebhookOutcome(
                rejected=True,
                rejection_reason="signature_missing",
                response_headers=_www_authenticate_header("webhook_signature_required"),
            )
        try:
            legacy_signer = verify_webhook_hmac(headers=headers, body=body, options=hmac_options)
            return legacy_signer, None
        except LegacyWebhookHmacError:
            return None, WebhookOutcome(
                rejected=True,
                rejection_reason="signature_legacy_failed",
                response_headers=_www_authenticate_header("webhook_signature_invalid"),
            )

    def _parse(self, payload_dict: dict[str, Any]) -> WebhookPayload | None:
        model = _MODEL_BY_KIND[self._config.kind]
        try:
            return cast(WebhookPayload, model.model_validate(payload_dict))
        except ValidationError as exc:
            # Operators need the field-level reason to diagnose sender bugs.
            # The receiver still returns payload_invalid downstream; this is
            # just observability.
            logger.warning(
                "webhook payload failed %s validation: %s",
                self._config.kind,
                exc.errors(include_url=False),
            )
            return None

Stateless webhook entry point, one instance per receiver configuration.

Instance state (config) is read-only after construction. Per-request state lives in the :class:WebhookOutcome returned from :meth:receive.

Methods

async def receive(self, *, method: str, url: str, headers: Mapping[str, str], body: bytes) ‑> WebhookOutcome
Expand source code
async def receive(
    self,
    *,
    method: str,
    url: str,
    headers: Mapping[str, str],
    body: bytes,
) -> WebhookOutcome:
    """Verify, dedupe, parse. Returns a :class:`WebhookOutcome`.

    Never raises for sender-caused cryptographic or protocol failures —
    returns an outcome with ``rejected=True`` and populated
    ``response_headers`` so the caller can convert to an HTTP response
    without try/except around every call. Operational failures inside
    the dedup backend or verify-options factory MAY still raise; wrap
    the call if you need to 5xx cleanly on internal errors.
    """
    if not _content_type_is_json(headers):
        return _reject("content_type_invalid", sender_identity=None)

    signer, rejection = await self._verify(method=method, url=url, headers=headers, body=body)
    if rejection is not None:
        return rejection
    assert signer is not None  # verification succeeded

    sender_id = signer.as_sender_identity()

    try:
        payload_dict = json.loads(body)
    except json.JSONDecodeError:
        return _reject("body_invalid_json", sender_identity=sender_id)
    if not isinstance(payload_dict, dict):
        return _reject("body_invalid_json", sender_identity=sender_id)

    idempotency_key = payload_dict.get("idempotency_key")
    if not isinstance(idempotency_key, str) or not idempotency_key:
        # Spec 3.0-rc: idempotency_key is REQUIRED on every webhook payload.
        return _reject("idempotency_key_missing", sender_identity=sender_id)
    if not _IDEMPOTENCY_KEY_RE.match(idempotency_key):
        # Non-conformant format — charset or length out of bounds.
        return _reject("idempotency_key_invalid", sender_identity=sender_id)

    parsed = self._parse(payload_dict)
    if parsed is None:
        return _reject("payload_invalid", sender_identity=sender_id)

    is_first_seen = await self._config.dedup.check_and_record(
        sender_id=sender_id, idempotency_key=idempotency_key
    )

    return WebhookOutcome(
        sender_identity=sender_id,
        payload=parsed,
        duplicate=not is_first_seen,
        idempotency_key=idempotency_key,
    )

Verify, dedupe, parse. Returns a :class:WebhookOutcome.

Never raises for sender-caused cryptographic or protocol failures — returns an outcome with rejected=True and populated response_headers so the caller can convert to an HTTP response without try/except around every call. Operational failures inside the dedup backend or verify-options factory MAY still raise; wrap the call if you need to 5xx cleanly on internal errors.

def receive_sync(self, *, method: str, url: str, headers: Mapping[str, str], body: bytes) ‑> WebhookOutcome
Expand source code
def receive_sync(
    self,
    *,
    method: str,
    url: str,
    headers: Mapping[str, str],
    body: bytes,
) -> WebhookOutcome:
    """Synchronous wrapper around :meth:`receive` for WSGI-style frameworks.

    Use this from Flask, Gunicorn sync workers, ``http.server``, or any
    other sync-only HTTP entry point where wrapping every call in
    ``asyncio.run(...)`` is just noise::

        @app.post("/webhooks/adcp")
        def hook():
            outcome = receiver.receive_sync(
                method=request.method,
                url=request.url,
                headers=dict(request.headers),
                body=request.get_data(),
            )
            ...

    Raises :class:`RuntimeError` if invoked from a thread that already has
    a running event loop — the underlying verify / dedup path is async and
    cannot be driven from inside an active loop without blocking it. From
    async code, call :meth:`receive` directly.
    """
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        # No running loop in this thread — safe to spin one up.
        return asyncio.run(self.receive(method=method, url=url, headers=headers, body=body))
    raise RuntimeError(
        "WebhookReceiver.receive_sync() cannot be called from a running "
        "event loop. Use `await receiver.receive(...)` instead."
    )

Synchronous wrapper around :meth:receive for WSGI-style frameworks.

Use this from Flask, Gunicorn sync workers, http.server, or any other sync-only HTTP entry point where wrapping every call in asyncio.run(…) is just noise::

@app.post("/webhooks/adcp")
def hook():
    outcome = receiver.receive_sync(
        method=request.method,
        url=request.url,
        headers=dict(request.headers),
        body=request.get_data(),
    )
    ...

Raises :class:RuntimeError if invoked from a thread that already has a running event loop — the underlying verify / dedup path is async and cannot be driven from inside an active loop without blocking it. From async code, call :meth:receive directly.

class WebhookReceiverConfig (verify_options: WebhookVerifyOptions,
dedup: WebhookDedupStore,
legacy_hmac: LegacyHmacFallback | None = None,
kind: WebhookKind = 'mcp')
Expand source code
@dataclass(frozen=True)
class WebhookReceiverConfig:
    """Configuration bundle.

    :param verify_options: verifier configuration (JWKS, replay store, etc.).
        A single instance is reused for every request — the verifier stamps
        ``now`` itself via ``verify_options.clock()``, so there's no need to
        refresh a time field per request.
    :param dedup: webhook-dedup store.
    :param legacy_hmac: optional HMAC-SHA256 fallback for 3.x migration.
    :param kind: which webhook payload type to parse into. Default ``"mcp"``
        (the task-status webhook that dominates most integrations); pass
        explicitly for list-change / artifact / revocation receivers.
    """

    verify_options: WebhookVerifyOptions
    dedup: WebhookDedupStore
    legacy_hmac: LegacyHmacFallback | None = None
    kind: WebhookKind = "mcp"

Configuration bundle.

:param verify_options: verifier configuration (JWKS, replay store, etc.). A single instance is reused for every request — the verifier stamps now itself via verify_options.clock(), so there's no need to refresh a time field per request. :param dedup: webhook-dedup store. :param legacy_hmac: optional HMAC-SHA256 fallback for 3.x migration. :param kind: which webhook payload type to parse into. Default "mcp" (the task-status webhook that dominates most integrations); pass explicitly for list-change / artifact / revocation receivers.

Instance variables

var dedupWebhookDedupStore
var kind : Literal['mcp', 'revocation_notification', 'collection_list_changed', 'property_list_changed', 'artifact']
var legacy_hmacLegacyHmacFallback | None
var verify_optionsWebhookVerifyOptions
class WebhookSender (*,
private_key: PrivateKey,
key_id: str,
alg: str,
client: httpx.AsyncClient | None = None,
timeout_seconds: float = 10.0)
Expand source code
class WebhookSender:
    """Outbound signed-webhook delivery client.

    Owns one webhook-signing private key. Reuses a single :class:`httpx.AsyncClient`
    across requests for connection pooling — pass your own via ``client=`` if
    you want to share it with other SDK surfaces.

    Thread/task safety: safe to call concurrent ``send_*`` from many asyncio
    tasks. The underlying ``httpx.AsyncClient`` manages its own pool.
    """

    def __init__(
        self,
        *,
        private_key: PrivateKey,
        key_id: str,
        alg: str,
        client: httpx.AsyncClient | None = None,
        timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS,
    ) -> None:
        self._private_key = private_key
        self._key_id = key_id
        self._alg = alg
        self._timeout = timeout_seconds
        self._client = client
        self._owns_client = client is None

    @classmethod
    def from_jwk(
        cls,
        jwk: Mapping[str, Any],
        *,
        d_field: str = "d",
        client: httpx.AsyncClient | None = None,
        timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS,
    ) -> WebhookSender:
        """Construct from a JWK that includes the private scalar.

        The JWK MUST have ``adcp_use == "webhook-signing"`` — the sender
        doesn't validate this (you're signing with your own key; validation
        happens at the receiver), but a key whose adcp_use is wrong will be
        rejected by every conformant verifier.
        """
        # Snapshot the mapping once — a live Mapping could otherwise return
        # different values across the adcp_use / kid / d / alg reads.
        jwk_snapshot = dict(jwk)
        if jwk_snapshot.get("adcp_use") != "webhook-signing":
            raise ValueError(
                f"WebhookSender requires a JWK with adcp_use='webhook-signing' "
                f"(got {jwk_snapshot.get('adcp_use')!r}). Webhook-signing and "
                f"request-signing keys MUST be distinct so a signature from one "
                f"surface cannot be replayed as the other. Generate a separate "
                f"key with adcp_use='webhook-signing' and publish it in your "
                f"adagents.json alongside your request-signing key. See "
                f"https://adcontextprotocol.org/docs/building/implementation/security"
            )
        alg = jwk_snapshot.get("alg")
        if alg == "EdDSA":
            alg = "ed25519"
        elif alg == "ES256":
            alg = "ecdsa-p256-sha256"
        if alg not in ("ed25519", "ecdsa-p256-sha256"):
            raise ValueError(f"unsupported JWK alg {jwk_snapshot.get('alg')!r}")
        private_key = private_key_from_jwk(jwk_snapshot, d_field=d_field)
        return cls(
            private_key=private_key,
            key_id=str(jwk_snapshot["kid"]),
            alg=alg,
            client=client,
            timeout_seconds=timeout_seconds,
        )

    @classmethod
    def from_pem(
        cls,
        pem_path: str | Path | bytes,
        *,
        key_id: str,
        alg: str = "ed25519",
        passphrase: bytes | None = None,
        client: httpx.AsyncClient | None = None,
        timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS,
    ) -> WebhookSender:
        """Load a private key from a PEM file and bind it as a webhook sender.

        Companion to ``adcp-keygen --purpose webhook-signing``, which writes
        the PEM and prints the public JWK. The JWK is published at your
        ``jwks_uri``; the PEM holds the private key material. ``from_pem``
        reads the PEM, constructs the right ``PrivateKey`` type for ``alg``,
        and returns a sender ready to send.

        Args:
            pem_path: Path to the PKCS#8 PEM, or the PEM bytes directly.
            key_id: JWK ``kid`` claim — must match the published JWK.
            alg: Signature algorithm. ``ed25519`` (default) or ``es256``.
                Also accepts the RFC 9421 form ``ecdsa-p256-sha256``.
            passphrase: Required if the PEM is encrypted
                (``adcp-keygen --encrypt``).
            client: Optional pre-built :class:`httpx.AsyncClient` to share
                across the SDK; the sender owns its own client when omitted.
            timeout_seconds: Per-request timeout for the owned client.

        Raises:
            ValueError: ``alg`` is not ed25519 / es256, or the PEM contains
                a key whose type doesn't match ``alg``.
        """
        if alg in ("es256", "ES256"):
            alg = ALG_ES256
        elif alg == "EdDSA":
            alg = ALG_ED25519
        if alg not in (ALG_ED25519, ALG_ES256):
            raise ValueError(
                f"unsupported alg {alg!r} — use 'ed25519' or 'es256' "
                f"(the two AdCP webhook-signing algorithms)"
            )

        if isinstance(pem_path, bytes):
            pem_bytes = pem_path
        else:
            pem_bytes = Path(pem_path).read_bytes()

        private_key = load_private_key_pem(pem_bytes, password=passphrase)

        # The PEM's key type must match the requested alg — mixing them
        # would produce signatures no verifier can validate, and the
        # resulting error at delivery time would point at the receiver.
        # Fail here so the misconfiguration surfaces at construction.
        if alg == ALG_ED25519 and not isinstance(private_key, ed25519.Ed25519PrivateKey):
            raise ValueError(
                f"PEM holds a {type(private_key).__name__} but alg='ed25519' "
                f"was requested. Re-run adcp-keygen with --alg ed25519, or "
                f"pass alg='es256' to match the existing PEM."
            )
        if alg == ALG_ES256 and not isinstance(private_key, ec.EllipticCurvePrivateKey):
            raise ValueError(
                f"PEM holds a {type(private_key).__name__} but alg='es256' "
                f"was requested. Re-run adcp-keygen with --alg es256, or "
                f"pass alg='ed25519' to match the existing PEM."
            )

        return cls(
            private_key=private_key,
            key_id=key_id,
            alg=alg,
            client=client,
            timeout_seconds=timeout_seconds,
        )

    def __repr__(self) -> str:
        # Explicit repr so no future debug helper or error traceback auto-
        # renders self.__dict__ and pulls the private key into logs.
        return f"WebhookSender(key_id={self._key_id!r}, alg={self._alg!r})"

    async def aclose(self) -> None:
        """Close the internal httpx client if we own it."""
        if self._owns_client and self._client is not None:
            await self._client.aclose()
            self._client = None

    async def __aenter__(self) -> WebhookSender:
        await self._get_client()
        return self

    async def __aexit__(self, *args: Any) -> None:
        await self.aclose()

    async def _get_client(self) -> httpx.AsyncClient:
        if self._client is None:
            self._client = httpx.AsyncClient(timeout=self._timeout)
        return self._client

    async def send_mcp(
        self,
        *,
        url: str,
        task_id: str,
        status: GeneratedTaskStatus | str,
        task_type: str | None = None,
        result: AdcpAsyncResponseData | dict[str, Any] | None = None,
        timestamp: datetime | None = None,
        operation_id: str | None = None,
        message: str | None = None,
        context_id: str | None = None,
        domain: str | None = None,
        idempotency_key: str | None = None,
        extra_headers: Mapping[str, str] | None = None,
    ) -> WebhookDeliveryResult:
        """POST a signed MCP-style task-status webhook.

        On retry, prefer :meth:`resend` over calling this again — ``resend``
        replays the exact same bytes, whereas re-invoking ``send_mcp`` with
        the "same" args would produce a fresh ``timestamp`` and potentially
        a different serialized body, which the receiver would dedupe but
        with different observed payload data.
        """
        payload = create_mcp_webhook_payload(
            task_id=task_id,
            status=status,
            task_type=task_type,
            result=result,
            timestamp=timestamp,
            operation_id=operation_id,
            message=message,
            context_id=context_id,
            domain=domain,
            idempotency_key=idempotency_key,
        )
        return await self.send_raw(
            url=url,
            idempotency_key=str(payload["idempotency_key"]),
            payload=payload,
            extra_headers=extra_headers,
        )

    async def send_revocation_notification(
        self,
        *,
        url: str,
        rights_id: str,
        brand_id: str,
        reason: str,
        effective_at: str,
        idempotency_key: str | None = None,
        extra_headers: Mapping[str, str] | None = None,
    ) -> WebhookDeliveryResult:
        """POST a signed rights-revocation notification."""
        key = idempotency_key or generate_webhook_idempotency_key()
        payload: dict[str, Any] = {
            "idempotency_key": key,
            "rights_id": rights_id,
            "brand_id": brand_id,
            "reason": reason,
            "effective_at": effective_at,
        }
        return await self.send_raw(
            url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers
        )

    async def send_artifact_webhook(
        self,
        *,
        url: str,
        media_buy_id: str,
        batch_id: str,
        timestamp: str,
        artifacts: list[dict[str, Any]],
        idempotency_key: str | None = None,
        extra_headers: Mapping[str, str] | None = None,
    ) -> WebhookDeliveryResult:
        """POST a signed content-standards artifact webhook."""
        key = idempotency_key or generate_webhook_idempotency_key()
        payload: dict[str, Any] = {
            "idempotency_key": key,
            "media_buy_id": media_buy_id,
            "batch_id": batch_id,
            "timestamp": timestamp,
            "artifacts": artifacts,
        }
        return await self.send_raw(
            url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers
        )

    async def send_collection_list_changed(
        self,
        *,
        url: str,
        list_id: str,
        resolved_at: str,
        signature: str,
        idempotency_key: str | None = None,
        extra_headers: Mapping[str, str] | None = None,
    ) -> WebhookDeliveryResult:
        """POST a signed governance collection-list-changed webhook.

        ``signature`` is the payload-level signature field that predates 9421
        webhook transport signing — it remains required by the schema. The
        9421 signature this method adds protects the transport envelope.
        """
        key = idempotency_key or generate_webhook_idempotency_key()
        payload: dict[str, Any] = {
            "idempotency_key": key,
            "event": "collection_list_changed",
            "list_id": list_id,
            "resolved_at": resolved_at,
            "signature": signature,
        }
        return await self.send_raw(
            url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers
        )

    async def send_property_list_changed(
        self,
        *,
        url: str,
        list_id: str,
        resolved_at: str,
        signature: str,
        idempotency_key: str | None = None,
        extra_headers: Mapping[str, str] | None = None,
    ) -> WebhookDeliveryResult:
        """POST a signed governance property-list-changed webhook."""
        key = idempotency_key or generate_webhook_idempotency_key()
        payload: dict[str, Any] = {
            "idempotency_key": key,
            "event": "property_list_changed",
            "list_id": list_id,
            "resolved_at": resolved_at,
            "signature": signature,
        }
        return await self.send_raw(
            url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers
        )

    async def send_raw(
        self,
        *,
        url: str,
        idempotency_key: str,
        payload: dict[str, Any],
        extra_headers: Mapping[str, str] | None = None,
    ) -> WebhookDeliveryResult:
        """Low-level escape hatch: sign + POST an arbitrary payload.

        The ``idempotency_key`` kwarg is required and is injected into the
        payload before signing — the visible signature makes the contract
        impossible to forget, unlike a runtime dict check. If ``payload``
        already carries an ``idempotency_key``, the kwarg wins so the two
        cannot disagree.
        """
        if not isinstance(idempotency_key, str) or not idempotency_key:
            raise ValueError("idempotency_key must be a non-empty string")
        body_dict = {**payload, "idempotency_key": idempotency_key}
        # Byte-exact serialization — this is the ONLY representation that
        # gets signed AND posted. Do not allow an httpx `json=` path anywhere
        # in the stack because it would reserialize and break the digest.
        body = json.dumps(body_dict).encode("utf-8")
        if len(body) > _MAX_BODY_BYTES:
            raise ValueError(
                f"serialized webhook body is {len(body):,} bytes, over the "
                f"{_MAX_BODY_BYTES:,}-byte cap. Split into smaller webhooks "
                "or use batch-reporting endpoints."
            )
        return await self._send_bytes(
            url=url,
            body=body,
            idempotency_key=idempotency_key,
            extra_headers=extra_headers,
        )

    async def resend(self, result: WebhookDeliveryResult) -> WebhookDeliveryResult:
        """Replay an earlier delivery under a fresh signature.

        The bytes are identical (same ``idempotency_key``, same payload
        fields, same serialization) — only the Signature / Signature-Input /
        Content-Digest headers are regenerated. The receiver dedupes via
        ``idempotency_key``, so the replayed event is a spec-correct retry
        that won't cause double-processing.
        """
        if not result.sent_body:
            raise ValueError(
                "cannot resend: result has no captured sent_body (likely constructed "
                "externally). Call a send_* method on this sender first."
            )
        return await self._send_bytes(
            url=result.url,
            body=result.sent_body,
            idempotency_key=result.idempotency_key,
            extra_headers=result.sent_extra_headers or None,
        )

    async def _send_bytes(
        self,
        *,
        url: str,
        body: bytes,
        idempotency_key: str,
        extra_headers: Mapping[str, str] | None,
    ) -> WebhookDeliveryResult:
        """Sign + POST a pre-serialized body. Shared by send_raw and resend."""
        base_headers = {"Content-Type": "application/json"}
        signed = sign_webhook(
            method="POST",
            url=url,
            headers=base_headers,
            body=body,
            private_key=self._private_key,
            key_id=self._key_id,
            alg=self._alg,
        )
        headers: dict[str, str] = {**base_headers, **signed.as_dict()}
        if extra_headers:
            # Pre-scan so a bad extra_header doesn't leave half-merged state.
            reserved = {"signature", "signature-input", "content-digest", "content-type"}
            for k in extra_headers:
                if str(k).lower() in reserved:
                    raise ValueError(
                        f"extra_headers may not override signature-binding or "
                        f"content-type header {k!r}"
                    )
            for k, v in extra_headers.items():
                headers[k] = v

        client = await self._get_client()
        response = await client.post(url, content=body, headers=headers)
        return WebhookDeliveryResult(
            status_code=response.status_code,
            idempotency_key=idempotency_key,
            url=url,
            response_headers=dict(response.headers),
            response_body=response.content,
            sent_body=body,
            sent_extra_headers=dict(extra_headers) if extra_headers else {},
        )

Outbound signed-webhook delivery client.

Owns one webhook-signing private key. Reuses a single :class:httpx.AsyncClient across requests for connection pooling — pass your own via client= if you want to share it with other SDK surfaces.

Thread/task safety: safe to call concurrent send_* from many asyncio tasks. The underlying httpx.AsyncClient manages its own pool.

Static methods

def from_jwk(jwk: Mapping[str, Any],
*,
d_field: str = 'd',
client: httpx.AsyncClient | None = None,
timeout_seconds: float = 10.0) ‑> WebhookSender

Construct from a JWK that includes the private scalar.

The JWK MUST have adcp_use == "webhook-signing" — the sender doesn't validate this (you're signing with your own key; validation happens at the receiver), but a key whose adcp_use is wrong will be rejected by every conformant verifier.

def from_pem(pem_path: str | Path | bytes,
*,
key_id: str,
alg: str = 'ed25519',
passphrase: bytes | None = None,
client: httpx.AsyncClient | None = None,
timeout_seconds: float = 10.0) ‑> WebhookSender

Load a private key from a PEM file and bind it as a webhook sender.

Companion to adcp-keygen --purpose webhook-signing, which writes the PEM and prints the public JWK. The JWK is published at your jwks_uri; the PEM holds the private key material. from_pem reads the PEM, constructs the right PrivateKey type for alg, and returns a sender ready to send.

Args

pem_path
Path to the PKCS#8 PEM, or the PEM bytes directly.
key_id
JWK kid claim — must match the published JWK.
alg
Signature algorithm. ed25519 (default) or es256. Also accepts the RFC 9421 form ecdsa-p256-sha256.
passphrase
Required if the PEM is encrypted (adcp-keygen --encrypt).
client
Optional pre-built :class:httpx.AsyncClient to share across the SDK; the sender owns its own client when omitted.
timeout_seconds
Per-request timeout for the owned client.

Raises

ValueError
alg is not ed25519 / es256, or the PEM contains a key whose type doesn't match alg.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close the internal httpx client if we own it."""
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None

Close the internal httpx client if we own it.

async def resend(self,
result: WebhookDeliveryResult) ‑> WebhookDeliveryResult
Expand source code
async def resend(self, result: WebhookDeliveryResult) -> WebhookDeliveryResult:
    """Replay an earlier delivery under a fresh signature.

    The bytes are identical (same ``idempotency_key``, same payload
    fields, same serialization) — only the Signature / Signature-Input /
    Content-Digest headers are regenerated. The receiver dedupes via
    ``idempotency_key``, so the replayed event is a spec-correct retry
    that won't cause double-processing.
    """
    if not result.sent_body:
        raise ValueError(
            "cannot resend: result has no captured sent_body (likely constructed "
            "externally). Call a send_* method on this sender first."
        )
    return await self._send_bytes(
        url=result.url,
        body=result.sent_body,
        idempotency_key=result.idempotency_key,
        extra_headers=result.sent_extra_headers or None,
    )

Replay an earlier delivery under a fresh signature.

The bytes are identical (same idempotency_key, same payload fields, same serialization) — only the Signature / Signature-Input / Content-Digest headers are regenerated. The receiver dedupes via idempotency_key, so the replayed event is a spec-correct retry that won't cause double-processing.

async def send_artifact_webhook(self,
*,
url: str,
media_buy_id: str,
batch_id: str,
timestamp: str,
artifacts: list[dict[str, Any]],
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult
Expand source code
async def send_artifact_webhook(
    self,
    *,
    url: str,
    media_buy_id: str,
    batch_id: str,
    timestamp: str,
    artifacts: list[dict[str, Any]],
    idempotency_key: str | None = None,
    extra_headers: Mapping[str, str] | None = None,
) -> WebhookDeliveryResult:
    """POST a signed content-standards artifact webhook."""
    key = idempotency_key or generate_webhook_idempotency_key()
    payload: dict[str, Any] = {
        "idempotency_key": key,
        "media_buy_id": media_buy_id,
        "batch_id": batch_id,
        "timestamp": timestamp,
        "artifacts": artifacts,
    }
    return await self.send_raw(
        url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers
    )

POST a signed content-standards artifact webhook.

async def send_collection_list_changed(self,
*,
url: str,
list_id: str,
resolved_at: str,
signature: str,
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult
Expand source code
async def send_collection_list_changed(
    self,
    *,
    url: str,
    list_id: str,
    resolved_at: str,
    signature: str,
    idempotency_key: str | None = None,
    extra_headers: Mapping[str, str] | None = None,
) -> WebhookDeliveryResult:
    """POST a signed governance collection-list-changed webhook.

    ``signature`` is the payload-level signature field that predates 9421
    webhook transport signing — it remains required by the schema. The
    9421 signature this method adds protects the transport envelope.
    """
    key = idempotency_key or generate_webhook_idempotency_key()
    payload: dict[str, Any] = {
        "idempotency_key": key,
        "event": "collection_list_changed",
        "list_id": list_id,
        "resolved_at": resolved_at,
        "signature": signature,
    }
    return await self.send_raw(
        url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers
    )

POST a signed governance collection-list-changed webhook.

signature is the payload-level signature field that predates 9421 webhook transport signing — it remains required by the schema. The 9421 signature this method adds protects the transport envelope.

async def send_mcp(self,
*,
url: str,
task_id: str,
status: GeneratedTaskStatus | str,
task_type: str | None = None,
result: AdcpAsyncResponseData | dict[str, Any] | None = None,
timestamp: datetime | None = None,
operation_id: str | None = None,
message: str | None = None,
context_id: str | None = None,
domain: str | None = None,
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult
Expand source code
async def send_mcp(
    self,
    *,
    url: str,
    task_id: str,
    status: GeneratedTaskStatus | str,
    task_type: str | None = None,
    result: AdcpAsyncResponseData | dict[str, Any] | None = None,
    timestamp: datetime | None = None,
    operation_id: str | None = None,
    message: str | None = None,
    context_id: str | None = None,
    domain: str | None = None,
    idempotency_key: str | None = None,
    extra_headers: Mapping[str, str] | None = None,
) -> WebhookDeliveryResult:
    """POST a signed MCP-style task-status webhook.

    On retry, prefer :meth:`resend` over calling this again — ``resend``
    replays the exact same bytes, whereas re-invoking ``send_mcp`` with
    the "same" args would produce a fresh ``timestamp`` and potentially
    a different serialized body, which the receiver would dedupe but
    with different observed payload data.
    """
    payload = create_mcp_webhook_payload(
        task_id=task_id,
        status=status,
        task_type=task_type,
        result=result,
        timestamp=timestamp,
        operation_id=operation_id,
        message=message,
        context_id=context_id,
        domain=domain,
        idempotency_key=idempotency_key,
    )
    return await self.send_raw(
        url=url,
        idempotency_key=str(payload["idempotency_key"]),
        payload=payload,
        extra_headers=extra_headers,
    )

POST a signed MCP-style task-status webhook.

On retry, prefer :meth:resend over calling this again — resend replays the exact same bytes, whereas re-invoking send_mcp with the "same" args would produce a fresh timestamp and potentially a different serialized body, which the receiver would dedupe but with different observed payload data.

async def send_property_list_changed(self,
*,
url: str,
list_id: str,
resolved_at: str,
signature: str,
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult
Expand source code
async def send_property_list_changed(
    self,
    *,
    url: str,
    list_id: str,
    resolved_at: str,
    signature: str,
    idempotency_key: str | None = None,
    extra_headers: Mapping[str, str] | None = None,
) -> WebhookDeliveryResult:
    """POST a signed governance property-list-changed webhook."""
    key = idempotency_key or generate_webhook_idempotency_key()
    payload: dict[str, Any] = {
        "idempotency_key": key,
        "event": "property_list_changed",
        "list_id": list_id,
        "resolved_at": resolved_at,
        "signature": signature,
    }
    return await self.send_raw(
        url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers
    )

POST a signed governance property-list-changed webhook.

async def send_raw(self,
*,
url: str,
idempotency_key: str,
payload: dict[str, Any],
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult
Expand source code
async def send_raw(
    self,
    *,
    url: str,
    idempotency_key: str,
    payload: dict[str, Any],
    extra_headers: Mapping[str, str] | None = None,
) -> WebhookDeliveryResult:
    """Low-level escape hatch: sign + POST an arbitrary payload.

    The ``idempotency_key`` kwarg is required and is injected into the
    payload before signing — the visible signature makes the contract
    impossible to forget, unlike a runtime dict check. If ``payload``
    already carries an ``idempotency_key``, the kwarg wins so the two
    cannot disagree.
    """
    if not isinstance(idempotency_key, str) or not idempotency_key:
        raise ValueError("idempotency_key must be a non-empty string")
    body_dict = {**payload, "idempotency_key": idempotency_key}
    # Byte-exact serialization — this is the ONLY representation that
    # gets signed AND posted. Do not allow an httpx `json=` path anywhere
    # in the stack because it would reserialize and break the digest.
    body = json.dumps(body_dict).encode("utf-8")
    if len(body) > _MAX_BODY_BYTES:
        raise ValueError(
            f"serialized webhook body is {len(body):,} bytes, over the "
            f"{_MAX_BODY_BYTES:,}-byte cap. Split into smaller webhooks "
            "or use batch-reporting endpoints."
        )
    return await self._send_bytes(
        url=url,
        body=body,
        idempotency_key=idempotency_key,
        extra_headers=extra_headers,
    )

Low-level escape hatch: sign + POST an arbitrary payload.

The idempotency_key kwarg is required and is injected into the payload before signing — the visible signature makes the contract impossible to forget, unlike a runtime dict check. If payload already carries an idempotency_key, the kwarg wins so the two cannot disagree.

async def send_revocation_notification(self,
*,
url: str,
rights_id: str,
brand_id: str,
reason: str,
effective_at: str,
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult
Expand source code
async def send_revocation_notification(
    self,
    *,
    url: str,
    rights_id: str,
    brand_id: str,
    reason: str,
    effective_at: str,
    idempotency_key: str | None = None,
    extra_headers: Mapping[str, str] | None = None,
) -> WebhookDeliveryResult:
    """POST a signed rights-revocation notification."""
    key = idempotency_key or generate_webhook_idempotency_key()
    payload: dict[str, Any] = {
        "idempotency_key": key,
        "rights_id": rights_id,
        "brand_id": brand_id,
        "reason": reason,
        "effective_at": effective_at,
    }
    return await self.send_raw(
        url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers
    )

POST a signed rights-revocation notification.

class WebhookVerifyOptions (*,
jwks_resolver: JwksResolver,
replay_store: ReplayStore | None = None,
revocation_checker: RevocationChecker | None = None,
revocation_list: RevocationList | None = None,
max_skew_seconds: int = 60,
max_window_seconds: int = 300,
label: str = 'sig1',
allowed_algs: frozenset[str] = frozenset({'ed25519', 'ecdsa-p256-sha256'}),
sender_url: str | None = None,
clock: Callable[[], float] = <built-in function time>)
Expand source code
@dataclass(frozen=True, kw_only=True)
class WebhookVerifyOptions:
    """Options for the webhook verifier.

    Subset of :class:`VerifyOptions` — several fields are pinned (tag, adcp_use,
    content-digest policy) because the webhook profile doesn't leave them as
    caller choices.

    Unlike the request verifier, there is no ``now`` field — the webhook
    verifier stamps time-of-check itself, so the same :class:`WebhookVerifyOptions`
    instance can live for the lifetime of your receiver without a factory
    closure around it. Override via ``clock=`` for deterministic tests.
    """

    jwks_resolver: JwksResolver
    replay_store: ReplayStore | None = None
    revocation_checker: RevocationChecker | None = None
    revocation_list: RevocationList | None = None
    max_skew_seconds: int = DEFAULT_SKEW_SECONDS
    max_window_seconds: int = MAX_WINDOW_SECONDS
    label: str = SIG_LABEL_DEFAULT
    allowed_algs: frozenset[str] = ALLOWED_ALGS
    sender_url: str | None = None
    clock: Callable[[], float] = time.time

Options for the webhook verifier.

Subset of :class:VerifyOptions — several fields are pinned (tag, adcp_use, content-digest policy) because the webhook profile doesn't leave them as caller choices.

Unlike the request verifier, there is no now field — the webhook verifier stamps time-of-check itself, so the same :class:WebhookVerifyOptions instance can live for the lifetime of your receiver without a factory closure around it. Override via clock= for deterministic tests.

Instance variables

var allowed_algs : frozenset[str]
var jwks_resolverJwksResolver
var label : str
var max_skew_seconds : int
var max_window_seconds : int
var replay_storeReplayStore | None
var revocation_checkerRevocationChecker | None
var revocation_listRevocationList | None
var sender_url : str | None

Methods

def clock(...) ‑> Callable[[], float]

time() -> floating point number

Return the current time in seconds since the Epoch. Fractions of a second may be present if the system clock provides them.