Module adcp.webhooks
Webhook creation, signing, and reception for AdCP agents.
Single front door for both senders and receivers. Underlying modules in
adcp.signing.webhook_* and adcp.webhook_receiver are implementation
details kept for internal organization — prefer the re-exports here for
stability.
Which sender helper to use
- :func:
deliver()— one-shot dispatch for legacyauthentication(Bearer or HMAC-SHA256). Collapses the sender's 6-step boilerplate into one call and signs the exact bytes it POSTs. Deprecated with AdCP 4.0; emits a :class:DeprecationWarning. - :class:
WebhookSender— the AdCP 4.0 default. RFC 9421 signing, shared connection pool, byte-identical replay via :meth:WebhookSender.resend(). Use this for any new integration. - :func:
create_mcp_webhook_payload()/ :func:create_a2a_webhook_payload()plus :func:get_adcp_signed_headers_for_webhook()— low-level path for callers who need full control over serialization, headers, or retry logic.
Functions
def create_a2a_webhook_payload(task_id: str,
status: GeneratedTaskStatus,
context_id: str,
result: AdcpAsyncResponseData | dict[str, Any],
timestamp: datetime | None = None) ‑> a2a.types.Task | a2a.types.TaskStatusUpdateEvent-
Expand source code
def create_a2a_webhook_payload( task_id: str, status: GeneratedTaskStatus, context_id: str, result: AdcpAsyncResponseData | dict[str, Any], timestamp: datetime | None = None, ) -> Task | TaskStatusUpdateEvent: """ Create A2A webhook payload (Task or TaskStatusUpdateEvent). Per A2A specification: - Terminated statuses (completed, failed): Returns Task with artifacts[].parts[] - Intermediate statuses (working, input-required, submitted): Returns TaskStatusUpdateEvent with status.message.parts[] This function helps agent implementations construct properly formatted A2A webhook payloads for sending to clients. Args: task_id: Unique identifier for the task status: Current task status context_id: Session/conversation identifier (required by A2A protocol) timestamp: When the webhook was generated (defaults to current UTC time) result: Task-specific payload (AdCP response data) Returns: Task object for terminated statuses, TaskStatusUpdateEvent for intermediate statuses Examples: Create a completed Task webhook: >>> from adcp.webhooks import create_a2a_webhook_payload >>> from adcp.types import GeneratedTaskStatus >>> >>> task = create_a2a_webhook_payload( ... task_id="task_123", ... status=GeneratedTaskStatus.completed, ... result={"products": [...]}, ... message="Found 5 products" ... ) >>> # task is a Task object with artifacts containing the result Create a working status update: >>> event = create_a2a_webhook_payload( ... task_id="task_456", ... status=GeneratedTaskStatus.working, ... message="Processing 3 of 10 items" ... ) >>> # event is a TaskStatusUpdateEvent with status.message Send A2A webhook via HTTP POST: >>> import httpx >>> from a2a.types import Task >>> >>> payload = create_a2a_webhook_payload(...) >>> # Serialize to dict for JSON >>> if isinstance(payload, Task): ... payload_dict = payload.model_dump(mode='json') ... else: ... payload_dict = payload.model_dump(mode='json') >>> >>> response = await httpx.post(webhook_url, json=payload_dict) """ if timestamp is None: timestamp = datetime.now(timezone.utc) # Convert datetime to ISO string for A2A protocol timestamp_str = timestamp.isoformat() if isinstance(timestamp, datetime) else timestamp # Map GeneratedTaskStatus to A2A status state string status_value = status.value if hasattr(status, "value") else str(status) # Map AdCP status to A2A status state # Note: A2A uses "input-required" (hyphenated) while AdCP uses "input_required" (underscore) status_mapping = { "completed": "completed", "failed": "failed", "working": "working", "submitted": "submitted", "input_required": "input-required", } a2a_status_state = status_mapping.get(status_value, status_value) # Build parts for the message/artifact parts: list[Part] = [] # Add DataPart # Convert AdcpAsyncResponseData to dict if it's a Pydantic model if hasattr(result, "model_dump"): result_dict: dict[str, Any] = result.model_dump(mode="json") else: result_dict = result data_part = DataPart(data=result_dict) parts.append(Part(root=data_part)) # Determine if this is a terminated status (Task) or intermediate (TaskStatusUpdateEvent) is_terminated = status in [GeneratedTaskStatus.completed, GeneratedTaskStatus.failed] # Convert string to TaskState enum task_state_enum = TaskState(a2a_status_state) if is_terminated: # Create Task object with artifacts for terminated statuses task_status = TaskStatus(state=task_state_enum, timestamp=timestamp_str) # Build artifact with parts # Note: Artifact requires artifact_id, use task_id as prefix if parts: artifact = Artifact( artifact_id=f"{task_id}_result", parts=parts, ) artifacts = [artifact] else: artifacts = [] return Task( id=task_id, status=task_status, artifacts=artifacts, context_id=context_id, ) else: # Create TaskStatusUpdateEvent with status.message for intermediate statuses # Build message with parts if parts: message_obj = Message( message_id=f"{task_id}_msg", role=Role.agent, # Agent is responding parts=parts, ) else: message_obj = None task_status = TaskStatus( state=task_state_enum, timestamp=timestamp_str, message=message_obj ) return TaskStatusUpdateEvent( task_id=task_id, status=task_status, context_id=context_id, final=False, # Intermediate statuses are not final )Create A2A webhook payload (Task or TaskStatusUpdateEvent).
Per A2A specification: - Terminated statuses (completed, failed): Returns Task with artifacts[].parts[] - Intermediate statuses (working, input-required, submitted): Returns TaskStatusUpdateEvent with status.message.parts[]
This function helps agent implementations construct properly formatted A2A webhook payloads for sending to clients.
Args
task_id- Unique identifier for the task
status- Current task status
context_id- Session/conversation identifier (required by A2A protocol)
timestamp- When the webhook was generated (defaults to current UTC time)
result- Task-specific payload (AdCP response data)
Returns
Task object for terminated statuses, TaskStatusUpdateEvent for intermediate statuses
Examples
Create a completed Task webhook:
>>> from adcp.webhooks import create_a2a_webhook_payload >>> from adcp.types import GeneratedTaskStatus >>> >>> task = create_a2a_webhook_payload( ... task_id="task_123", ... status=GeneratedTaskStatus.completed, ... result={"products": [...]}, ... message="Found 5 products" ... ) >>> # task is a Task object with artifacts containing the resultCreate a working status update:
>>> event = create_a2a_webhook_payload( ... task_id="task_456", ... status=GeneratedTaskStatus.working, ... message="Processing 3 of 10 items" ... ) >>> # event is a TaskStatusUpdateEvent with status.messageSend A2A webhook via HTTP POST:
>>> import httpx >>> from a2a.types import Task >>> >>> payload = create_a2a_webhook_payload(...) >>> # Serialize to dict for JSON >>> if isinstance(payload, Task): ... payload_dict = payload.model_dump(mode='json') ... else: ... payload_dict = payload.model_dump(mode='json') >>> >>> response = await httpx.post(webhook_url, json=payload_dict) def create_mcp_webhook_payload(task_id: str,
status: GeneratedTaskStatus | str,
result: AdcpAsyncResponseData | dict[str, Any] | None = None,
timestamp: datetime | None = None,
task_type: str | None = None,
operation_id: str | None = None,
message: str | None = None,
context_id: str | None = None,
domain: str | None = None,
idempotency_key: str | None = None) ‑> dict[str, typing.Any]-
Expand source code
def create_mcp_webhook_payload( task_id: str, status: GeneratedTaskStatus | str, result: AdcpAsyncResponseData | dict[str, Any] | None = None, timestamp: datetime | None = None, task_type: str | None = None, operation_id: str | None = None, message: str | None = None, context_id: str | None = None, domain: str | None = None, idempotency_key: str | None = None, ) -> dict[str, Any]: """ Create MCP webhook payload dictionary. This function helps agent implementations construct properly formatted webhook payloads for sending to clients. Args: task_id: Unique identifier for the task status: Current task status task_type: Optionally type of AdCP operation (e.g., "get_products", "create_media_buy") timestamp: When the webhook was generated (defaults to current UTC time) result: Task-specific payload (AdCP response data) operation_id: Publisher-defined operation identifier (deprecated from payload, should be in URL routing, but included for backward compatibility) message: Human-readable summary of task state context_id: Session/conversation identifier domain: AdCP domain this task belongs to idempotency_key: Sender-generated key stable across retries of the same event. Defaults to a freshly-generated UUID v4 — callers retrying delivery of the same event MUST pass the key from their first attempt; passing None twice mints two keys and defeats dedup. Returns: Dictionary matching McpWebhookPayload schema, ready to be sent as JSON Examples: Create a completed webhook with results: >>> from adcp.webhooks import create_mcp_webhook_payload >>> from adcp.types import GeneratedTaskStatus >>> >>> payload = create_mcp_webhook_payload( ... task_id="task_123", ... task_type="get_products", ... status=GeneratedTaskStatus.completed, ... result={"products": [...]}, ... message="Found 5 products" ... ) Create a failed webhook with error: >>> payload = create_mcp_webhook_payload( ... task_id="task_456", ... task_type="create_media_buy", ... status=GeneratedTaskStatus.failed, ... result={"errors": [{"code": "INVALID_INPUT", "message": "..."}]}, ... message="Validation failed" ... ) Create a working status update: >>> payload = create_mcp_webhook_payload( ... task_id="task_789", ... task_type="sync_creatives", ... status=GeneratedTaskStatus.working, ... message="Processing 3 of 10 creatives" ... ) """ if timestamp is None: timestamp = datetime.now(timezone.utc) if idempotency_key is None: idempotency_key = generate_webhook_idempotency_key() # Convert status enum to string value status_value = status.value if hasattr(status, "value") else str(status) # Build payload matching McpWebhookPayload schema payload: dict[str, Any] = { "idempotency_key": idempotency_key, "task_id": task_id, "task_type": task_type, "status": status_value, "timestamp": timestamp.isoformat() if isinstance(timestamp, datetime) else timestamp, } # Add optional fields only if provided if result is not None: # Convert Pydantic model to dict if needed for JSON serialization if hasattr(result, "model_dump"): payload["result"] = result.model_dump(mode="json") else: payload["result"] = result if operation_id is not None: payload["operation_id"] = operation_id if message is not None: payload["message"] = message if context_id is not None: payload["context_id"] = context_id if domain is not None: payload["domain"] = domain return payloadCreate MCP webhook payload dictionary.
This function helps agent implementations construct properly formatted webhook payloads for sending to clients.
Args
task_id- Unique identifier for the task
status- Current task status
task_type- Optionally type of AdCP operation (e.g., "get_products", "create_media_buy")
timestamp- When the webhook was generated (defaults to current UTC time)
result- Task-specific payload (AdCP response data)
operation_id- Publisher-defined operation identifier (deprecated from payload, should be in URL routing, but included for backward compatibility)
message- Human-readable summary of task state
context_id- Session/conversation identifier
domain- AdCP domain this task belongs to
idempotency_key- Sender-generated key stable across retries of the same event. Defaults to a freshly-generated UUID v4 — callers retrying delivery of the same event MUST pass the key from their first attempt; passing None twice mints two keys and defeats dedup.
Returns
Dictionary matching McpWebhookPayload schema, ready to be sent as JSON
Examples
Create a completed webhook with results:
>>> from adcp.webhooks import create_mcp_webhook_payload >>> from adcp.types import GeneratedTaskStatus >>> >>> payload = create_mcp_webhook_payload( ... task_id="task_123", ... task_type="get_products", ... status=GeneratedTaskStatus.completed, ... result={"products": [...]}, ... message="Found 5 products" ... )Create a failed webhook with error:
>>> payload = create_mcp_webhook_payload( ... task_id="task_456", ... task_type="create_media_buy", ... status=GeneratedTaskStatus.failed, ... result={"errors": [{"code": "INVALID_INPUT", "message": "..."}]}, ... message="Validation failed" ... )Create a working status update:
>>> payload = create_mcp_webhook_payload( ... task_id="task_789", ... task_type="sync_creatives", ... status=GeneratedTaskStatus.working, ... message="Processing 3 of 10 creatives" ... ) async def deliver(config: AdCPBaseModel | Mapping[str, Any],
payload: AdCPBaseModel | Task | TaskStatusUpdateEvent | Mapping[str, Any],
*,
client: httpx.AsyncClient | None = None,
extra_headers: Mapping[str, str] | None = None,
timeout_seconds: float | None = None,
token_field: str | None = None) ‑> httpx.Response-
Expand source code
async def deliver( config: AdCPBaseModel | Mapping[str, Any], payload: AdCPBaseModel | Task | TaskStatusUpdateEvent | Mapping[str, Any], *, client: httpx.AsyncClient | None = None, extra_headers: Mapping[str, str] | None = None, timeout_seconds: float | None = None, token_field: str | None = None, ) -> httpx.Response: """Dispatch one legacy-auth webhook in a single call. Collapses the sender's six-step boilerplate (build envelope, serialize, sign, merge headers, POST, echo token) into one call so the signer and the wire see the *same bytes*. The serialization-format drift that plagued the hand-rolled path — ``json=`` in httpx re-serializes the dict and breaks ``Content-Digest`` — is structurally impossible here: the helper JSON-serializes once, signs those bytes, and POSTs those bytes via ``content=``. This helper is for the **legacy** AdCP 3.x authentication schemes (``Bearer`` / ``HMAC-SHA256``) and emits a :class:`DeprecationWarning` on first use. For 4.0+ integrations use :class:`WebhookSender` (RFC 9421). Args: config: A :class:`PushNotificationConfig`, :class:`ReportingWebhook`, or equivalent dict. Must carry ``url`` (``https://`` only) and ``authentication.{schemes, credentials}``. payload: The webhook body. Accepts a Pydantic model (e.g. built via :func:`create_mcp_webhook_payload` / :func:`create_a2a_webhook_payload`), an a2a ``Task`` / ``TaskStatusUpdateEvent``, or a plain dict. Models are dumped with ``mode="json", exclude_none=True``. client: Optional shared ``httpx.AsyncClient``. Recommended in production for connection pooling and egress-policy enforcement (a custom ``httpx.BaseTransport`` is the right place to block SSRF to private IPs — the helper validates scheme but cannot see post-DNS resolution without racing TOCTOU). extra_headers: Merged last. May not override any of ``Content-Type``, ``Content-Digest``, ``Content-Length``, ``Host``, ``Authorization``, ``Signature``, ``Signature-Input``, ``X-AdCP-Signature``, or ``X-AdCP-Timestamp``. Auth and signature-binding headers are sender-owned so the signer and the wire cannot disagree. timeout_seconds: Per-request timeout applied only when the helper creates its own client. Raises ``ValueError`` if set alongside ``client=`` — configure the timeout on the shared client instead. token_field: Opt-in field name for echoing ``config.token`` into the payload body (top-level for MCP dicts, under ``metadata`` for ``Task`` / ``TaskStatusUpdateEvent``). Default ``None`` disables echo; there is no spec-defined field name, so the caller must pick one the receiver agrees to read. Returns: The raw ``httpx.Response``. Caller is responsible for ``response.status_code`` inspection and retry scheduling. For retry, pass the *same, unmutated* payload again — serialization is deterministic so retries produce byte-identical bodies (spec-correct receiver dedup via ``idempotency_key``). Mutating the payload dict between attempts breaks byte-identity; callers who need byte-identical HTTP envelopes across retries (including headers) should use :class:`WebhookSender` and :meth:`WebhookSender.resend`. There is intentionally no ``resend()`` here — the retry contract is "call ``deliver`` again with the same inputs". Raises: ValueError: missing ``url``, non-HTTPS URL, control characters in header values, missing / unknown ``authentication`` (use :class:`WebhookSender` for RFC 9421), overriding a reserved header, or setting ``timeout_seconds`` alongside ``client``. DeprecationWarning (fires once): ``authentication`` is a 3.x fallback. Security notes: * ``config.url`` is buyer-controlled. The helper enforces HTTPS and rejects control characters but does NOT block private / link-local destinations — wire an egress policy via ``client.transport`` to stop SSRF into your VPC or cloud metadata service. * ``config.token`` sits in the request body, so any receiver that logs bodies retains it indefinitely. Treat the token as a medium-sensitivity correlator, not a long-lived secret. * At ``httpx`` DEBUG log level, ``Authorization`` and ``X-AdCP-Signature`` appear in logs — gate DEBUG in production. """ if client is not None and timeout_seconds is not None: raise ValueError( "timeout_seconds cannot be set when client= is provided; " "configure the timeout on your shared httpx.AsyncClient instead." ) url, token, auth_scheme, credentials = _extract_config_fields(config) if auth_scheme is None: raise ValueError( "config.authentication is required for deliver(). " "For RFC 9421 signing (the AdCP 4.0 default), use " "adcp.webhooks.WebhookSender — no helper for unsigned webhooks " "is provided because the spec requires signing." ) if auth_scheme not in ("Bearer", "HMAC-SHA256"): raise ValueError( f"unknown authentication scheme {auth_scheme!r}; " "supported legacy schemes are 'Bearer' and 'HMAC-SHA256'. " "For RFC 9421 use adcp.webhooks.WebhookSender." ) _warn_auth_deprecation_once() body_dict = _payload_to_dict(payload) if token is not None and token_field is not None: _validate_header_value("config.token", token) _inject_push_token(body_dict, token, payload, token_field) # Compact separators so the signer and the wire see byte-identical # payloads, matching the canonical on-wire form pinned by # adcontextprotocol/adcp#2478. ``_compute_legacy_signature`` returns the # same compact body bytes below — we serialize here for the size check # and Bearer path, which both operate on the final transmitted bytes. body_bytes = json.dumps(body_dict, separators=(",", ":")).encode("utf-8") if len(body_bytes) > _MAX_BODY_BYTES: raise ValueError( f"serialized webhook body is {len(body_bytes):,} bytes, over the " f"{_MAX_BODY_BYTES:,}-byte cap. Split into smaller webhooks or use " "the batch-reporting endpoints — most receivers reject bodies over " "10MB at the reverse proxy anyway." ) headers: dict[str, str] = {"Content-Type": "application/json"} if auth_scheme == "Bearer": if not credentials: raise ValueError( "config.authentication.schemes=['Bearer'] requires " "authentication.credentials (min 32 characters — token " "exchanged out-of-band with the receiver)." ) _validate_header_value("authentication.credentials", credentials) headers["Authorization"] = f"Bearer {credentials}" else: # HMAC-SHA256 if not credentials: raise ValueError( "config.authentication.schemes=['HMAC-SHA256'] requires " "authentication.credentials (min 32 characters — shared " "secret exchanged out-of-band with the receiver)." ) _validate_header_value("authentication.credentials", credentials) get_adcp_signed_headers_for_webhook( headers, secret=credentials, timestamp=str(int(time.time())), payload=body_dict, ) if extra_headers: if len(extra_headers) > _MAX_EXTRA_HEADERS: raise ValueError( f"extra_headers has {len(extra_headers)} entries; " f"helper caps at {_MAX_EXTRA_HEADERS}. Pass only the custom " "headers you actually need (trace IDs, correlation IDs)." ) for key in extra_headers: normalized = str(key).lower() if normalized in _RESERVED_HEADERS or normalized.startswith(":"): raise ValueError(_reserved_header_message(normalized, key)) for key, value in extra_headers.items(): _validate_header_value(f"extra_headers[{key!r}]", value) headers[key] = value owns_client = client is None effective_timeout = timeout_seconds if timeout_seconds is not None else _DEFAULT_TIMEOUT_SECONDS http_client = client or httpx.AsyncClient(timeout=effective_timeout) try: return await http_client.post(url, content=body_bytes, headers=headers) finally: if owns_client: await http_client.aclose()Dispatch one legacy-auth webhook in a single call.
Collapses the sender's six-step boilerplate (build envelope, serialize, sign, merge headers, POST, echo token) into one call so the signer and the wire see the same bytes. The serialization-format drift that plagued the hand-rolled path —
json=in httpx re-serializes the dict and breaksContent-Digest— is structurally impossible here: the helper JSON-serializes once, signs those bytes, and POSTs those bytes viacontent=.This helper is for the legacy AdCP 3.x authentication schemes (
Bearer/HMAC-SHA256) and emits a :class:DeprecationWarningon first use. For 4.0+ integrations use :class:WebhookSender(RFC 9421).Args
config- A :class:
PushNotificationConfig, :class:ReportingWebhook, or equivalent dict. Must carryurl(https://only) andauthentication.{schemes, credentials}. payload- The webhook body. Accepts a Pydantic model (e.g. built via
:func:
create_mcp_webhook_payload()/ :func:create_a2a_webhook_payload()), an a2aTask/TaskStatusUpdateEvent, or a plain dict. Models are dumped withmode="json", exclude_none=True. client- Optional shared
httpx.AsyncClient. Recommended in production for connection pooling and egress-policy enforcement (a customhttpx.BaseTransportis the right place to block SSRF to private IPs — the helper validates scheme but cannot see post-DNS resolution without racing TOCTOU). extra_headers- Merged last. May not override any of
Content-Type,Content-Digest,Content-Length,Host,Authorization,Signature,Signature-Input,X-AdCP-Signature, orX-AdCP-Timestamp. Auth and signature-binding headers are sender-owned so the signer and the wire cannot disagree. timeout_seconds- Per-request timeout applied only when the helper
creates its own client. Raises
ValueErrorif set alongsideclient=— configure the timeout on the shared client instead. token_field- Opt-in field name for echoing
config.tokeninto the payload body (top-level for MCP dicts, undermetadataforTask/TaskStatusUpdateEvent). DefaultNonedisables echo; there is no spec-defined field name, so the caller must pick one the receiver agrees to read.
Returns
The raw
httpx.Response. Caller is responsible forresponse.status_codeinspection and retry scheduling. For retry, pass the same, unmutated payload again — serialization is deterministic so retries produce byte-identical bodies (spec-correct receiver dedup viaidempotency_key). Mutating the payload dict between attempts breaks byte-identity; callers who need byte-identical HTTP envelopes across retries (including headers) should use :class:WebhookSenderand :meth:WebhookSender.resend(). There is intentionally noresend()here — the retry contract is "calldeliver()again with the same inputs".Raises
ValueError- missing
url, non-HTTPS URL, control characters in header values, missing / unknownauthentication(use :class:WebhookSenderfor RFC 9421), overriding a reserved header, or settingtimeout_secondsalongsideclient.
DeprecationWarning (fires once):
authenticationis a 3.x fallback. Security notes: *config.urlis buyer-controlled. The helper enforces HTTPS and rejects control characters but does NOT block private / link-local destinations — wire an egress policy viaclient.transportto stop SSRF into your VPC or cloud metadata service. *config.tokensits in the request body, so any receiver that logs bodies retains it indefinitely. Treat the token as a medium-sensitivity correlator, not a long-lived secret. * AthttpxDEBUG log level,AuthorizationandX-AdCP-Signatureappear in logs — gate DEBUG in production. def extract_webhook_result_data(webhook_payload: dict[str, Any]) ‑> AdcpAsyncResponseData | None-
Expand source code
def extract_webhook_result_data(webhook_payload: dict[str, Any]) -> AdcpAsyncResponseData | None: """ Extract result data from webhook payload (MCP or A2A format). This utility function handles webhook payloads from both MCP and A2A protocols, extracting the result data regardless of the webhook format. Useful for quick inspection, logging, or custom webhook routing logic without requiring full client initialization. Protocol Detection: - A2A Task: Has "artifacts" field (terminated statuses: completed, failed) - A2A TaskStatusUpdateEvent: Has nested "status.message" structure (intermediate statuses) - MCP: Has "result" field directly Args: webhook_payload: Raw webhook dictionary from HTTP request (JSON-deserialized) Returns: AdcpAsyncResponseData union type containing the extracted AdCP response, or None if no result present. For A2A webhooks, unwraps data from artifacts/message parts structure. For MCP webhooks, returns the result field directly. Examples: Extract from MCP webhook: >>> mcp_payload = { ... "task_id": "task_123", ... "task_type": "create_media_buy", ... "status": "completed", ... "timestamp": "2025-01-15T10:00:00Z", ... "result": {"media_buy_id": "mb_123", "buyer_ref": "ref_123", "packages": []} ... } >>> result = extract_webhook_result_data(mcp_payload) >>> print(result["media_buy_id"]) mb_123 Extract from A2A Task webhook: >>> a2a_task_payload = { ... "id": "task_456", ... "context_id": "ctx_456", ... "status": {"state": "completed", "timestamp": "2025-01-15T10:00:00Z"}, ... "artifacts": [ ... { ... "artifact_id": "artifact_456", ... "parts": [ ... { ... "data": { ... "media_buy_id": "mb_456", ... "buyer_ref": "ref_456", ... "packages": [] ... } ... } ... ] ... } ... ] ... } >>> result = extract_webhook_result_data(a2a_task_payload) >>> print(result["media_buy_id"]) mb_456 Extract from A2A TaskStatusUpdateEvent webhook: >>> a2a_event_payload = { ... "task_id": "task_789", ... "context_id": "ctx_789", ... "status": { ... "state": "working", ... "timestamp": "2025-01-15T10:00:00Z", ... "message": { ... "message_id": "msg_789", ... "role": "agent", ... "parts": [ ... {"data": {"current_step": "processing", "percentage": 50}} ... ] ... } ... }, ... "final": False ... } >>> result = extract_webhook_result_data(a2a_event_payload) >>> print(result["percentage"]) 50 Handle webhook with no result: >>> empty_payload = {"task_id": "task_000", "status": "working", "timestamp": "..."} >>> result = extract_webhook_result_data(empty_payload) >>> print(result) None """ # Detect A2A Task format (has "artifacts" field) if "artifacts" in webhook_payload: # Extract from task.artifacts[].parts[] artifacts = webhook_payload.get("artifacts", []) if not artifacts: return None # Use last artifact (most recent) target_artifact = artifacts[-1] parts = target_artifact.get("parts", []) if not parts: return None # Find DataPart (skip TextPart) for part in parts: # Check if this part has "data" field (DataPart) if "data" in part: data = part["data"] # Unwrap {"response": {...}} wrapper if present (A2A convention) if isinstance(data, dict) and "response" in data and len(data) == 1: return cast(AdcpAsyncResponseData, data["response"]) return cast(AdcpAsyncResponseData, data) return None # Detect A2A TaskStatusUpdateEvent format (has nested "status.message") status = webhook_payload.get("status") if isinstance(status, dict): message = status.get("message") if isinstance(message, dict): # Extract from status.message.parts[] parts = message.get("parts", []) if not parts: return None # Find DataPart for part in parts: if "data" in part: data = part["data"] # Unwrap {"response": {...}} wrapper if present if isinstance(data, dict) and "response" in data and len(data) == 1: return cast(AdcpAsyncResponseData, data["response"]) return cast(AdcpAsyncResponseData, data) return None # MCP format: result field directly return cast(AdcpAsyncResponseData | None, webhook_payload.get("result"))Extract result data from webhook payload (MCP or A2A format).
This utility function handles webhook payloads from both MCP and A2A protocols, extracting the result data regardless of the webhook format. Useful for quick inspection, logging, or custom webhook routing logic without requiring full client initialization.
Protocol Detection: - A2A Task: Has "artifacts" field (terminated statuses: completed, failed) - A2A TaskStatusUpdateEvent: Has nested "status.message" structure (intermediate statuses) - MCP: Has "result" field directly
Args
webhook_payload- Raw webhook dictionary from HTTP request (JSON-deserialized)
Returns
AdcpAsyncResponseData union type containing the extracted AdCP response, or None if no result present. For A2A webhooks, unwraps data from artifacts/message parts structure. For MCP webhooks, returns the result field directly.
Examples
Extract from MCP webhook:
>>> mcp_payload = { ... "task_id": "task_123", ... "task_type": "create_media_buy", ... "status": "completed", ... "timestamp": "2025-01-15T10:00:00Z", ... "result": {"media_buy_id": "mb_123", "buyer_ref": "ref_123", "packages": []} ... } >>> result = extract_webhook_result_data(mcp_payload) >>> print(result["media_buy_id"]) mb_123Extract from A2A Task webhook:
>>> a2a_task_payload = { ... "id": "task_456", ... "context_id": "ctx_456", ... "status": {"state": "completed", "timestamp": "2025-01-15T10:00:00Z"}, ... "artifacts": [ ... { ... "artifact_id": "artifact_456", ... "parts": [ ... { ... "data": { ... "media_buy_id": "mb_456", ... "buyer_ref": "ref_456", ... "packages": [] ... } ... } ... ] ... } ... ] ... } >>> result = extract_webhook_result_data(a2a_task_payload) >>> print(result["media_buy_id"]) mb_456Extract from A2A TaskStatusUpdateEvent webhook:
>>> a2a_event_payload = { ... "task_id": "task_789", ... "context_id": "ctx_789", ... "status": { ... "state": "working", ... "timestamp": "2025-01-15T10:00:00Z", ... "message": { ... "message_id": "msg_789", ... "role": "agent", ... "parts": [ ... {"data": {"current_step": "processing", "percentage": 50}} ... ] ... } ... }, ... "final": False ... } >>> result = extract_webhook_result_data(a2a_event_payload) >>> print(result["percentage"]) 50Handle webhook with no result:
>>> empty_payload = {"task_id": "task_000", "status": "working", "timestamp": "..."} >>> result = extract_webhook_result_data(empty_payload) >>> print(result) None def generate_webhook_idempotency_key() ‑> str-
Expand source code
def generate_webhook_idempotency_key() -> str: """Generate a cryptographically random idempotency_key for a webhook event. Returns a UUID v4 prefixed with ``whk_`` — matches the example format in ``webhooks.mdx`` and stays within the spec's length + charset bounds (``^[A-Za-z0-9_.:-]{16,255}$``). Publishers MUST generate this once per distinct event and reuse the same value when retrying delivery. Do NOT call this function again on retry — it would mint a fresh UUID and defeat the dedup contract. """ return f"whk_{uuid.uuid4()}"Generate a cryptographically random idempotency_key for a webhook event.
Returns a UUID v4 prefixed with
whk_— matches the example format inwebhooks.mdxand stays within the spec's length + charset bounds (^[A-Za-z0-9_.:-]{16,255}$).Publishers MUST generate this once per distinct event and reuse the same value when retrying delivery. Do NOT call this function again on retry — it would mint a fresh UUID and defeat the dedup contract.
def get_adcp_signed_headers_for_webhook(headers: dict[str, Any],
secret: str,
timestamp: str | int | None,
payload: dict[str, Any] | AdCPBaseModel) ‑> dict[str, typing.Any]-
Expand source code
def get_adcp_signed_headers_for_webhook( headers: dict[str, Any], secret: str, timestamp: str | int | None, payload: dict[str, Any] | AdCPBaseModel, ) -> dict[str, Any]: """ Generate AdCP-compliant signed headers for webhook delivery. This function creates a cryptographic signature that proves the webhook came from an authorized agent and protects against replay attacks by including a timestamp in the signed message. The function adds two headers to the provided headers dict: - X-AdCP-Signature: HMAC-SHA256 signature in format "sha256=<hex_digest>" - X-AdCP-Timestamp: Unix timestamp in seconds The signing algorithm: 1. Constructs message as "{timestamp}.{json_payload}" 2. JSON-serializes payload with default separators (matches wire format from json= kwarg) 3. UTF-8 encodes the message 4. HMAC-SHA256 signs with the shared secret 5. Hex-encodes and prefixes with "sha256=" Args: headers: Existing headers dictionary to add signature headers to secret: Shared secret key for HMAC signing timestamp: Unix timestamp in seconds (str or int). If None, uses current time. payload: Webhook payload (dict or Pydantic model - will be JSON-serialized) Returns: The modified headers dictionary with signature headers added Examples: Sign and send an MCP webhook: >>> import time >>> from adcp.webhooks import create_mcp_webhook_payload >>> from adcp.webhooks import get_adcp_signed_headers_for_webhook >>> >>> payload = create_mcp_webhook_payload( ... task_id="task_123", ... task_type="get_products", ... status="completed", ... result={"products": [...]} ... ) >>> headers = {"Content-Type": "application/json"} >>> signed_headers = get_adcp_signed_headers_for_webhook( ... headers, secret="my-webhook-secret", timestamp=str(int(time.time())), ... payload=payload, ... ) >>> >>> # Send webhook with signed headers >>> import httpx >>> response = await httpx.post( ... webhook_url, ... json=payload, ... headers=signed_headers ... ) Headers will contain: >>> print(signed_headers) { "Content-Type": "application/json", "X-AdCP-Signature": "sha256=a1b2c3...", "X-AdCP-Timestamp": "1773185740" } """ signature_headers, _body_bytes = _compute_legacy_signature( secret=secret, timestamp=timestamp, payload=payload ) headers.update(signature_headers) return headersGenerate AdCP-compliant signed headers for webhook delivery.
This function creates a cryptographic signature that proves the webhook came from an authorized agent and protects against replay attacks by including a timestamp in the signed message.
The function adds two headers to the provided headers dict: - X-AdCP-Signature: HMAC-SHA256 signature in format "sha256=
" - X-AdCP-Timestamp: Unix timestamp in seconds The signing algorithm: 1. Constructs message as "{timestamp}.{json_payload}" 2. JSON-serializes payload with default separators (matches wire format from json= kwarg) 3. UTF-8 encodes the message 4. HMAC-SHA256 signs with the shared secret 5. Hex-encodes and prefixes with "sha256="
Args
headers- Existing headers dictionary to add signature headers to
secret- Shared secret key for HMAC signing
timestamp- Unix timestamp in seconds (str or int). If None, uses current time.
payload- Webhook payload (dict or Pydantic model - will be JSON-serialized)
Returns
The modified headers dictionary with signature headers added
Examples
Sign and send an MCP webhook:
>>> import time >>> from adcp.webhooks import create_mcp_webhook_payload >>> from adcp.webhooks import get_adcp_signed_headers_for_webhook >>> >>> payload = create_mcp_webhook_payload( ... task_id="task_123", ... task_type="get_products", ... status="completed", ... result={"products": [...]} ... ) >>> headers = {"Content-Type": "application/json"} >>> signed_headers = get_adcp_signed_headers_for_webhook( ... headers, secret="my-webhook-secret", timestamp=str(int(time.time())), ... payload=payload, ... ) >>> >>> # Send webhook with signed headers >>> import httpx >>> response = await httpx.post( ... webhook_url, ... json=payload, ... headers=signed_headers ... )Headers will contain:
>>> print(signed_headers) { "Content-Type": "application/json", "X-AdCP-Signature": "sha256=a1b2c3...", "X-AdCP-Timestamp": "1773185740" } def sign_legacy_webhook(secret: str,
payload: dict[str, Any] | AdCPBaseModel,
*,
timestamp: str | int | None = None,
headers: dict[str, Any] | None = None) ‑> tuple[dict[str, str], bytes]-
Expand source code
def sign_legacy_webhook( secret: str, payload: dict[str, Any] | AdCPBaseModel, *, timestamp: str | int | None = None, headers: dict[str, Any] | None = None, ) -> tuple[dict[str, str], bytes]: """Return ``(signed_headers, body_bytes)`` for a legacy HMAC webhook. Byte-equality between signature input and HTTP body is guaranteed — callers POST ``content=body_bytes`` instead of ``json=payload``, so the separator-drift trap that caused silent 401s in every spaced-vs-compact interop is structurally impossible here. This is a lower-level companion to :func:`deliver` for callers who need to own the HTTP transport themselves (custom auth, pre-configured ``httpx.AsyncClient``, non-httpx clients). For the one-shot "send a webhook" path, prefer :func:`deliver`. The returned ``body_bytes`` use compact separators (``","``/``":"``) matching the canonical on-wire form pinned by adcontextprotocol/adcp#2478. Example: >>> signed, body = sign_legacy_webhook("shared-secret", payload) >>> headers = {**signed, "Content-Type": "application/json"} >>> await client.post(url, content=body, headers=headers) """ signature_headers, body_bytes = _compute_legacy_signature( secret=secret, timestamp=timestamp, payload=payload ) if headers is not None: merged = {str(k): str(v) for k, v in headers.items()} merged.update(signature_headers) return merged, body_bytes return signature_headers, body_bytesReturn
(signed_headers, body_bytes)for a legacy HMAC webhook.Byte-equality between signature input and HTTP body is guaranteed — callers POST
content=body_bytesinstead ofjson=payload, so the separator-drift trap that caused silent 401s in every spaced-vs-compact interop is structurally impossible here.This is a lower-level companion to :func:
deliver()for callers who need to own the HTTP transport themselves (custom auth, pre-configuredhttpx.AsyncClient, non-httpx clients). For the one-shot "send a webhook" path, prefer :func:deliver().The returned
body_bytesuse compact separators (","/":") matching the canonical on-wire form pinned by adcontextprotocol/adcp#2478.Example
>>> signed, body = sign_legacy_webhook("shared-secret", payload) >>> headers = {**signed, "Content-Type": "application/json"} >>> await client.post(url, content=body, headers=headers) def sign_webhook(*,
method: str,
url: str,
headers: Mapping[str, str],
body: bytes,
private_key: PrivateKey,
key_id: str,
alg: str,
created: int | None = None,
expires_in_seconds: int = 300,
nonce: str | None = None,
label: str = 'sig1') ‑> SignedHeaders-
Expand source code
def sign_webhook( *, method: str, url: str, headers: Mapping[str, str], body: bytes, private_key: PrivateKey, key_id: str, alg: str, created: int | None = None, expires_in_seconds: int = DEFAULT_EXPIRES_IN_SECONDS, nonce: str | None = None, label: str = SIG_LABEL_DEFAULT, ) -> SignedHeaders: """Sign an outgoing webhook POST per adcp/webhook-signing/v1. ``cover_content_digest=True`` and ``tag=WEBHOOK_TAG`` are pinned. The caller attaches ``SignedHeaders.as_dict()`` to the outgoing HTTP request. The ``method`` is normally ``"POST"`` for webhook delivery; passed through unchanged so callers signing a retried ``PUT`` or variant delivery verb are not forced into an extra translation. See also: :class:`adcp.webhooks.WebhookSender` — higher-level one-call helper that builds the payload, signs, and POSTs in a single call. Prefer it unless you need to own the HTTP transport yourself. """ return sign_request( method=method, url=url, headers=headers, body=body, private_key=private_key, key_id=key_id, alg=alg, cover_content_digest=True, created=created, expires_in_seconds=expires_in_seconds, nonce=nonce, tag=WEBHOOK_TAG, label=label, )Sign an outgoing webhook POST per adcp/webhook-signing/v1.
cover_content_digest=Trueandtag=WEBHOOK_TAGare pinned. The caller attachesSignedHeaders.as_dict()to the outgoing HTTP request.The
methodis normally"POST"for webhook delivery; passed through unchanged so callers signing a retriedPUTor variant delivery verb are not forced into an extra translation.See also: :class:
WebhookSender— higher-level one-call helper that builds the payload, signs, and POSTs in a single call. Prefer it unless you need to own the HTTP transport yourself. def verify_webhook_hmac(*,
headers: Mapping[str, str],
body: bytes,
options: LegacyWebhookHmacOptions) ‑> VerifiedLegacyWebhookSender-
Expand source code
def verify_webhook_hmac( *, headers: Mapping[str, str], body: bytes, options: LegacyWebhookHmacOptions, ) -> VerifiedLegacyWebhookSender: """Verify an HMAC-SHA256-signed webhook body per the legacy scheme. Raises :class:`LegacyWebhookHmacError` on any failure. Fires a one-time :class:`DeprecationWarning` — operators SHOULD migrate to 9421 before AdCP 4.0 removes the ``authentication`` field. ``headers`` can be any ``Mapping[str, str]`` — ``dict``, ``werkzeug.datastructures.EnvironHeaders``, Starlette's ``Headers``, etc. Keys are case-folded internally. """ _warn_once() header_map = {str(k).lower(): str(v) for k, v in headers.items()} sig_value = header_map.get(_SIGNATURE_HEADER) ts_value = header_map.get(_TIMESTAMP_HEADER) if sig_value is None or ts_value is None: raise LegacyWebhookHmacError("missing X-AdCP-Signature or X-AdCP-Timestamp header") if not sig_value.startswith(_HEX_PREFIX): raise LegacyWebhookHmacError( f"signature must start with {_HEX_PREFIX!r}, got {sig_value[:16]!r}" ) hex_sig = sig_value[len(_HEX_PREFIX) :] try: ts_int = int(ts_value) except ValueError as exc: raise LegacyWebhookHmacError(f"invalid timestamp {ts_value!r}") from exc # Bound on the skew window. Matches the 9421 max window (300s) exactly — # the 9421 pipeline applies DEFAULT_SKEW_SECONDS inside its window check, # so both schemes have the same "skew budget" on the wire. Do NOT add # DEFAULT_SKEW_SECONDS on top; that would double-count and yield a 360s # budget for HMAC vs 300s for 9421. skew = abs(options.now - ts_int) if skew > options.window_seconds: raise LegacyWebhookHmacError( f"timestamp skew {skew:.0f}s exceeds window {options.window_seconds}s" ) # The sender constructs the message as f"{timestamp}.{json_payload}" # where json_payload is the body bytes as UTF-8. Re-decoding a dict would # re-serialize with potentially different key order and break the # signature — verify against the raw bytes as received. message = f"{ts_value}.".encode() + body expected = hmac.new(options.secret, message, hashlib.sha256).hexdigest() # Constant-time compare — hmac.compare_digest handles str/str. if not hmac.compare_digest(expected, hex_sig): raise LegacyWebhookHmacError("signature did not match") return VerifiedLegacyWebhookSender(sender_identity=options.sender_identity)Verify an HMAC-SHA256-signed webhook body per the legacy scheme.
Raises :class:
LegacyWebhookHmacErroron any failure. Fires a one-time :class:DeprecationWarning— operators SHOULD migrate to 9421 before AdCP 4.0 removes theauthenticationfield.headerscan be anyMapping[str, str]—dict,werkzeug.datastructures.EnvironHeaders, Starlette'sHeaders, etc. Keys are case-folded internally. def verify_webhook_signature(*,
method: str,
url: str,
headers: Mapping[str, str],
body: bytes,
options: WebhookVerifyOptions) ‑> VerifiedWebhookSender-
Expand source code
def verify_webhook_signature( *, method: str, url: str, headers: Mapping[str, str], body: bytes, options: WebhookVerifyOptions, ) -> VerifiedWebhookSender: """Verify an incoming signed webhook per the adcp/webhook-signing/v1 profile. Raises :class:`SignatureVerificationError` with a ``webhook_signature_*`` code on failure. Success returns a :class:`VerifiedWebhookSender` carrying the identity to scope dedup state by. """ _precheck_webhook_has_required_components(headers) request_options = VerifyOptions( now=options.clock(), capability=VerifierCapability( supported=True, covers_content_digest="required", required_for=frozenset({"webhook"}), ), operation="webhook", jwks_resolver=options.jwks_resolver, replay_store=options.replay_store, revocation_checker=options.revocation_checker, revocation_list=options.revocation_list, max_skew_seconds=options.max_skew_seconds, max_window_seconds=options.max_window_seconds, label=options.label, expected_tag=WEBHOOK_TAG, expected_adcp_use=ADCP_USE_WEBHOOK, allowed_algs=options.allowed_algs, agent_url=options.sender_url, ) try: signer: VerifiedSigner = verify_request_signature( method=method, url=url, headers=headers, body=body, options=request_options ) except SignatureVerificationError as exc: raise _retag_to_webhook(exc) from exc return VerifiedWebhookSender( key_id=signer.key_id, alg=signer.alg, label=signer.label, verified_at=signer.verified_at, sender_url=signer.agent_url, )Verify an incoming signed webhook per the adcp/webhook-signing/v1 profile.
Raises :class:
SignatureVerificationErrorwith awebhook_signature_*code on failure. Success returns a :class:VerifiedWebhookSendercarrying the identity to scope dedup state by.
Classes
class LegacyHmacFallback (options_for: Callable[[Mapping[str, str]], LegacyWebhookHmacOptions | None],
only_when_9421_absent: bool = True)-
Expand source code
@dataclass(frozen=True) class LegacyHmacFallback: """Opt-in policy for accepting HMAC-SHA256 senders during 3.x migration. The default behavior of the receiver is to reject any request that fails 9421 verification. Pass an instance of this class to ``WebhookReceiverConfig`` to accept HMAC-signed webhooks as a fallback. :param options_for: callback that returns a populated :class:`LegacyWebhookHmacOptions` given the incoming request headers. Your implementation resolves the sender (from Bearer, hostname, or legacy shared-secret tag) and returns the secret + sender_identity tuple the verifier needs. Return ``None`` to decline the fallback for this request (rejection follows the 9421-only failure path). :param only_when_9421_absent: when ``True`` (default), HMAC fallback only fires when no 9421 headers are present at all. When a request carries 9421 headers that FAIL verification, it still rejects — preventing a downgrade attack where a MITM strips the 9421 signature and replaces it with a forged HMAC one it knows the secret for. When ``False``, HMAC is tried on any 9421 failure; only set this for testing or known homogenous sender cohorts. """ options_for: Callable[[Mapping[str, str]], LegacyWebhookHmacOptions | None] only_when_9421_absent: bool = True @classmethod def from_shared_secret( cls, *, secret: bytes, sender_identity: str, only_when_9421_absent: bool = True, window_seconds: int = 300, ) -> LegacyHmacFallback: """Convenience constructor for the "one secret, one sender" case. Covers the common 3.x migration setup where the receiver has exactly one publisher on the legacy scheme and binds them to a known ``sender_identity`` (typically a buyer-defined string). For multi- sender or header-derived-identity setups, construct with an ``options_for`` callback directly. """ import time as _time def _options_for(_headers: Mapping[str, str]) -> LegacyWebhookHmacOptions: return LegacyWebhookHmacOptions( secret=secret, sender_identity=sender_identity, now=_time.time(), window_seconds=window_seconds, ) return cls( options_for=_options_for, only_when_9421_absent=only_when_9421_absent, )Opt-in policy for accepting HMAC-SHA256 senders during 3.x migration.
The default behavior of the receiver is to reject any request that fails 9421 verification. Pass an instance of this class to
WebhookReceiverConfigto accept HMAC-signed webhooks as a fallback.:param options_for: callback that returns a populated :class:
LegacyWebhookHmacOptionsgiven the incoming request headers. Your implementation resolves the sender (from Bearer, hostname, or legacy shared-secret tag) and returns the secret + sender_identity tuple the verifier needs. ReturnNoneto decline the fallback for this request (rejection follows the 9421-only failure path). :param only_when_9421_absent: whenTrue(default), HMAC fallback only fires when no 9421 headers are present at all. When a request carries 9421 headers that FAIL verification, it still rejects — preventing a downgrade attack where a MITM strips the 9421 signature and replaces it with a forged HMAC one it knows the secret for. WhenFalse, HMAC is tried on any 9421 failure; only set this for testing or known homogenous sender cohorts.Static methods
-
Convenience constructor for the "one secret, one sender" case.
Covers the common 3.x migration setup where the receiver has exactly one publisher on the legacy scheme and binds them to a known
sender_identity(typically a buyer-defined string). For multi- sender or header-derived-identity setups, construct with anoptions_forcallback directly.
Instance variables
var only_when_9421_absent : boolvar options_for : Callable[[Mapping[str, str]], LegacyWebhookHmacOptions | None]
-
class LegacyWebhookHmacError (reason: str)-
Expand source code
class LegacyWebhookHmacError(Exception): """Raised when HMAC-SHA256 legacy verification fails. Distinct from :class:`adcp.signing.errors.SignatureVerificationError` so callers can distinguish legacy-path failures from 9421-path failures — operators want to know which scheme fired when diagnosing a 401. """ def __init__(self, reason: str) -> None: super().__init__(reason) self.reason = reasonRaised when HMAC-SHA256 legacy verification fails.
Distinct from :class:
SignatureVerificationErrorso callers can distinguish legacy-path failures from 9421-path failures — operators want to know which scheme fired when diagnosing a 401.Ancestors
- builtins.Exception
- builtins.BaseException
class LegacyWebhookHmacOptions (secret: bytes, sender_identity: str, now: float, window_seconds: int = 300)-
Expand source code
@dataclass(frozen=True) class LegacyWebhookHmacOptions: """Options for the HMAC verifier. :param secret_resolver: callable ``(header_map) -> bytes | None`` that returns the shared secret for this incoming request. The receiver is responsible for determining sender identity from headers (Bearer token, IP allowlist, hostname) and looking up the secret bound to that sender. The resolver returns ``None`` when no sender can be authenticated — the verifier then rejects without attempting compare. :param sender_identity: the string used to scope dedup after verification succeeds. In HMAC-legacy, there is no cryptographic sender identity (the secret IS the identity), so the caller provides one — typically derived from the same lookup that produced the secret. :param now: current time, epoch seconds. Defaults fetched at call time. :param window_seconds: accepted skew. Sender timestamp outside ``[now - window, now + window]`` rejects. """ secret: bytes sender_identity: str now: float window_seconds: int = _DEFAULT_WINDOW_SECONDSOptions for the HMAC verifier.
:param secret_resolver: callable
(header_map) -> bytes | Nonethat returns the shared secret for this incoming request. The receiver is responsible for determining sender identity from headers (Bearer token, IP allowlist, hostname) and looking up the secret bound to that sender. The resolver returnsNonewhen no sender can be authenticated — the verifier then rejects without attempting compare. :param sender_identity: the string used to scope dedup after verification succeeds. In HMAC-legacy, there is no cryptographic sender identity (the secret IS the identity), so the caller provides one — typically derived from the same lookup that produced the secret. :param now: current time, epoch seconds. Defaults fetched at call time. :param window_seconds: accepted skew. Sender timestamp outside[now - window, now + window]rejects.Instance variables
var now : floatvar secret : bytesvar sender_identity : strvar window_seconds : int
class MemoryBackend (*, clock: Callable[[], float] = <built-in function time>)-
Expand source code
class MemoryBackend(IdempotencyBackend): """In-process dict-backed store. Suitable for tests, single-process reference implementations, and local development. **Not suitable for multi-process deployments** — each worker has its own cache, so a retry that lands on a different worker is treated as a fresh request. Thread safety: the backend uses an :class:`asyncio.Lock` to serialize mutations of the shared dict. Reads go through the lock too; for a pure in-process backend this is cheap and prevents torn reads across concurrent ``get``/``put`` interleaving. :param clock: Callable returning the current epoch seconds. Override for tests that need to advance time deterministically without monkeypatching :mod:`time`. Defaults to :func:`time.time`. """ def __init__(self, *, clock: Callable[[], float] = time.time) -> None: self._store: dict[tuple[str, str], CachedResponse] = {} self._lock = asyncio.Lock() self._clock = clock async def get( self, principal_id: str, key: str ) -> CachedResponse | None: async with self._lock: entry = self._store.get((principal_id, key)) if entry is None: return None if entry.expires_at_epoch <= self._clock(): # Lazy expiry — drop the stale entry so the next request # treats the slot as fresh and races to repopulate. del self._store[(principal_id, key)] return None return entry async def put( self, principal_id: str, key: str, entry: CachedResponse, ) -> None: async with self._lock: self._store[(principal_id, key)] = entry async def delete_expired(self, now_epoch: float | None = None) -> int: cutoff = now_epoch if now_epoch is not None else self._clock() async with self._lock: stale = [k for k, v in self._store.items() if v.expires_at_epoch <= cutoff] for k in stale: del self._store[k] return len(stale) async def clear(self) -> None: """Remove all cached entries. Test-suite hook — handy for resetting state between fixtures when a single :class:`MemoryBackend` is shared across multiple tests. """ async with self._lock: self._store.clear() async def _size(self) -> int: """Test-only: return the current entry count.""" async with self._lock: return len(self._store)In-process dict-backed store.
Suitable for tests, single-process reference implementations, and local development. Not suitable for multi-process deployments — each worker has its own cache, so a retry that lands on a different worker is treated as a fresh request.
Thread safety: the backend uses an :class:
asyncio.Lockto serialize mutations of the shared dict. Reads go through the lock too; for a pure in-process backend this is cheap and prevents torn reads across concurrentget/putinterleaving.:param clock: Callable returning the current epoch seconds. Override for tests that need to advance time deterministically without monkeypatching :mod:
time. Defaults to :func:time.time.Ancestors
- IdempotencyBackend
- abc.ABC
Methods
async def clear(self) ‑> None-
Expand source code
async def clear(self) -> None: """Remove all cached entries. Test-suite hook — handy for resetting state between fixtures when a single :class:`MemoryBackend` is shared across multiple tests. """ async with self._lock: self._store.clear()Remove all cached entries.
Test-suite hook — handy for resetting state between fixtures when a single :class:
MemoryBackendis shared across multiple tests.
Inherited members
class VerifiedLegacyWebhookSender (sender_identity: str)-
Expand source code
@dataclass(frozen=True) class VerifiedLegacyWebhookSender: """Identity returned by the HMAC verifier on success. Shape-compatible with the 9421 ``VerifiedWebhookSender.as_sender_identity`` so downstream dedup code treats both the same. """ sender_identity: str def as_sender_identity(self) -> str: return self.sender_identityIdentity returned by the HMAC verifier on success.
Shape-compatible with the 9421
VerifiedWebhookSender.as_sender_identity()so downstream dedup code treats both the same.Instance variables
var sender_identity : str
Methods
def as_sender_identity(self) ‑> str-
Expand source code
def as_sender_identity(self) -> str: return self.sender_identity
class VerifiedSignerLike (*args, **kwargs)-
Expand source code
@runtime_checkable class VerifiedSignerLike(Protocol): """Anything with ``as_sender_identity() -> str``. Both :class:`VerifiedWebhookSender` (9421) and :class:`VerifiedLegacyWebhookSender` (HMAC) implement this shape, so the receiver treats both verification paths identically downstream. """ def as_sender_identity(self) -> str: ...Anything with
as_sender_identity() -> str.Both :class:
VerifiedWebhookSender(9421) and :class:VerifiedLegacyWebhookSender(HMAC) implement this shape, so the receiver treats both verification paths identically downstream.Ancestors
- typing.Protocol
- typing.Generic
Methods
def as_sender_identity(self) ‑> str-
Expand source code
def as_sender_identity(self) -> str: ...
class VerifiedWebhookSender (key_id: str,
alg: str,
label: str,
verified_at: float,
sender_url: str | None = None)-
Expand source code
@dataclass(frozen=True) class VerifiedWebhookSender: """Returned on successful webhook verification. Distinct type from :class:`VerifiedSigner` so a caller that mistakenly passes a request-verified signer into a webhook-scoped dedup store (or the reverse) will fail to type-check. Both carry the same bytes; the type separation is a guardrail, not a data difference. """ key_id: str alg: str label: str verified_at: float sender_url: str | None = None def as_sender_identity(self) -> str: """Identity string used to scope dedup state. Webhook dedup MUST be scoped to the authenticated sender — trusting a payload field for identity is the attack-surface hole the spec's "Sender requirements" paragraph calls out. The key_id is the cryptographically verified identity; prefer ``sender_url:key_id`` when a sender URL is present to tolerate JWKS reuse across co-deployed senders. """ if self.sender_url is not None: return f"{self.sender_url}|{self.key_id}" return self.key_idReturned on successful webhook verification.
Distinct type from :class:
VerifiedSignerso a caller that mistakenly passes a request-verified signer into a webhook-scoped dedup store (or the reverse) will fail to type-check. Both carry the same bytes; the type separation is a guardrail, not a data difference.Instance variables
var alg : strvar key_id : strvar label : strvar sender_url : str | Nonevar verified_at : float
Methods
def as_sender_identity(self) ‑> str-
Expand source code
def as_sender_identity(self) -> str: """Identity string used to scope dedup state. Webhook dedup MUST be scoped to the authenticated sender — trusting a payload field for identity is the attack-surface hole the spec's "Sender requirements" paragraph calls out. The key_id is the cryptographically verified identity; prefer ``sender_url:key_id`` when a sender URL is present to tolerate JWKS reuse across co-deployed senders. """ if self.sender_url is not None: return f"{self.sender_url}|{self.key_id}" return self.key_idIdentity string used to scope dedup state.
Webhook dedup MUST be scoped to the authenticated sender — trusting a payload field for identity is the attack-surface hole the spec's "Sender requirements" paragraph calls out. The key_id is the cryptographically verified identity; prefer
sender_url:key_idwhen a sender URL is present to tolerate JWKS reuse across co-deployed senders.
class WebhookDedupStore (backend: IdempotencyBackend,
ttl_seconds: int = 86400,
*,
namespace: str = 'webhook',
clock: Callable[[], float] = <built-in function time>)-
Expand source code
class WebhookDedupStore: """Dedup ``(sender_id, idempotency_key)`` pairs to suppress retried webhooks. :param backend: any :class:`IdempotencyBackend`. Same MemoryBackend or PgBackend type used by :class:`IdempotencyStore` is fine — the ``namespace`` parameter prefixes all sender IDs so request-side and webhook-side scopes can't alias even when sharing one backend instance. :param ttl_seconds: replay window. Must be within ``[86400, 604800]`` per the spec minimum. Defaults to 86400 (24h). :param namespace: prefix applied to every ``sender_id`` before it hits the backend. Defaults to ``"webhook"``, which is safe when the same backend is shared with :class:`IdempotencyStore` (request-side keys are scoped by a principal_id that isn't wrapped in this namespace, so collisions are impossible). Override only if you run multiple webhook scopes against one backend (e.g., separate dedup spaces for task webhooks vs list-change webhooks). """ def __init__( self, backend: IdempotencyBackend, ttl_seconds: int = _MIN_TTL_SECONDS, *, namespace: str = "webhook", clock: Callable[[], float] = time.time, ) -> None: if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS: raise ValueError( f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] " f"per webhook spec minimum, got {ttl_seconds}" ) if not namespace: raise ValueError("namespace must be a non-empty string") self.backend = backend self.ttl_seconds = ttl_seconds self.namespace = namespace self._clock = clock async def check_and_record(self, sender_id: str, idempotency_key: str) -> bool: """Atomically check for first-seen and record if new. Returns ``True`` when the pair is first-seen (event should be processed), ``False`` on duplicate (caller MUST still return 2xx to the sender — the event was delivered successfully, it's just a retry). Race note: the check-then-put pattern is not atomic across concurrent callers unless the backend provides its own atomicity. MemoryBackend serializes individual ``get`` and ``put`` under an ``asyncio.Lock`` but does NOT bracket them together — two concurrent retries of the same event CAN both observe "first-seen" and both process the event. That's a tolerable failure mode: the ultimate guarantee is "at most once per replay window in the common case"; a concurrent retry arriving in the same few milliseconds is rare and, if it happens, produces the same "duplicated side effect" outcome the at-least-once contract already warns callers to tolerate. PgBackend implementations SHOULD use ``INSERT ... ON CONFLICT DO NOTHING`` returning ``rowcount`` for lock-free atomicity. """ if not sender_id: raise ValueError("sender_id must be a non-empty string") if not idempotency_key: raise ValueError("idempotency_key must be a non-empty string") scoped_sender = f"{self.namespace}:{sender_id}" existing = await self.backend.get(scoped_sender, idempotency_key) if existing is not None: logger.debug( "webhook dedup: duplicate sender=%s key_prefix=%s", sender_id, idempotency_key[:8], ) return False entry = CachedResponse( payload_hash=_SENTINEL_HASH, response={}, expires_at_epoch=self._clock() + self.ttl_seconds, ) try: await self.backend.put(scoped_sender, idempotency_key, entry) except Exception: # Same fail-open reasoning as the request-side store: log and # process. Swallowing the put failure means this event MIGHT # reprocess on retry, not that we drop it. Better than raising, # which would look like handler failure to the sender. logger.warning( "webhook dedup put failed for sender=%s key_prefix=%s — " "event processed but next retry will reprocess", sender_id, idempotency_key[:8], exc_info=True, ) return TrueDedup
(sender_id, idempotency_key)pairs to suppress retried webhooks.:param backend: any :class:
IdempotencyBackend. Same MemoryBackend or PgBackend type used by :class:IdempotencyStoreis fine — thenamespaceparameter prefixes all sender IDs so request-side and webhook-side scopes can't alias even when sharing one backend instance. :param ttl_seconds: replay window. Must be within[86400, 604800]per the spec minimum. Defaults to 86400 (24h). :param namespace: prefix applied to everysender_idbefore it hits the backend. Defaults to"webhook", which is safe when the same backend is shared with :class:IdempotencyStore(request-side keys are scoped by a principal_id that isn't wrapped in this namespace, so collisions are impossible). Override only if you run multiple webhook scopes against one backend (e.g., separate dedup spaces for task webhooks vs list-change webhooks).Methods
async def check_and_record(self, sender_id: str, idempotency_key: str) ‑> bool-
Expand source code
async def check_and_record(self, sender_id: str, idempotency_key: str) -> bool: """Atomically check for first-seen and record if new. Returns ``True`` when the pair is first-seen (event should be processed), ``False`` on duplicate (caller MUST still return 2xx to the sender — the event was delivered successfully, it's just a retry). Race note: the check-then-put pattern is not atomic across concurrent callers unless the backend provides its own atomicity. MemoryBackend serializes individual ``get`` and ``put`` under an ``asyncio.Lock`` but does NOT bracket them together — two concurrent retries of the same event CAN both observe "first-seen" and both process the event. That's a tolerable failure mode: the ultimate guarantee is "at most once per replay window in the common case"; a concurrent retry arriving in the same few milliseconds is rare and, if it happens, produces the same "duplicated side effect" outcome the at-least-once contract already warns callers to tolerate. PgBackend implementations SHOULD use ``INSERT ... ON CONFLICT DO NOTHING`` returning ``rowcount`` for lock-free atomicity. """ if not sender_id: raise ValueError("sender_id must be a non-empty string") if not idempotency_key: raise ValueError("idempotency_key must be a non-empty string") scoped_sender = f"{self.namespace}:{sender_id}" existing = await self.backend.get(scoped_sender, idempotency_key) if existing is not None: logger.debug( "webhook dedup: duplicate sender=%s key_prefix=%s", sender_id, idempotency_key[:8], ) return False entry = CachedResponse( payload_hash=_SENTINEL_HASH, response={}, expires_at_epoch=self._clock() + self.ttl_seconds, ) try: await self.backend.put(scoped_sender, idempotency_key, entry) except Exception: # Same fail-open reasoning as the request-side store: log and # process. Swallowing the put failure means this event MIGHT # reprocess on retry, not that we drop it. Better than raising, # which would look like handler failure to the sender. logger.warning( "webhook dedup put failed for sender=%s key_prefix=%s — " "event processed but next retry will reprocess", sender_id, idempotency_key[:8], exc_info=True, ) return TrueAtomically check for first-seen and record if new.
Returns
Truewhen the pair is first-seen (event should be processed),Falseon duplicate (caller MUST still return 2xx to the sender — the event was delivered successfully, it's just a retry).Race note: the check-then-put pattern is not atomic across concurrent callers unless the backend provides its own atomicity. MemoryBackend serializes individual
getandputunder anasyncio.Lockbut does NOT bracket them together — two concurrent retries of the same event CAN both observe "first-seen" and both process the event. That's a tolerable failure mode: the ultimate guarantee is "at most once per replay window in the common case"; a concurrent retry arriving in the same few milliseconds is rare and, if it happens, produces the same "duplicated side effect" outcome the at-least-once contract already warns callers to tolerate. PgBackend implementations SHOULD useINSERT … ON CONFLICT DO NOTHINGreturningrowcountfor lock-free atomicity.
class WebhookDeliveryResult (status_code: int,
idempotency_key: str,
url: str,
response_headers: Mapping[str, str],
response_body: bytes,
sent_body: bytes = b'',
sent_extra_headers: Mapping[str, str] = <factory>)-
Expand source code
@dataclass(frozen=True) class WebhookDeliveryResult: """Outcome of one ``send_*`` call. Senders care about: did it land (``ok``), what key was used (for logs and retry), what did the receiver say (``status_code``, ``response_body``). The ``sent_body`` and ``sent_extra_headers`` fields capture exactly what was signed and POSTed — the sender's :meth:`WebhookSender.resend` replays them under a fresh signature (preserving ``idempotency_key`` for dedup) rather than re-serializing from a user-supplied dict, which would drift if any field (``timestamp``, nested ``result``) differs between calls. """ status_code: int idempotency_key: str url: str response_headers: Mapping[str, str] response_body: bytes sent_body: bytes = b"" sent_extra_headers: Mapping[str, str] = field(default_factory=dict) @property def ok(self) -> bool: """True on 2xx. Note: receivers MUST return 2xx on duplicates too, so a 200 with ``duplicate=true`` in the body is still ``ok``.""" return 200 <= self.status_code < 300Outcome of one
send_*call.Senders care about: did it land (
ok), what key was used (for logs and retry), what did the receiver say (status_code,response_body).The
sent_bodyandsent_extra_headersfields capture exactly what was signed and POSTed — the sender's :meth:WebhookSender.resend()replays them under a fresh signature (preservingidempotency_keyfor dedup) rather than re-serializing from a user-supplied dict, which would drift if any field (timestamp, nestedresult) differs between calls.Instance variables
var idempotency_key : strprop ok : bool-
Expand source code
@property def ok(self) -> bool: """True on 2xx. Note: receivers MUST return 2xx on duplicates too, so a 200 with ``duplicate=true`` in the body is still ``ok``.""" return 200 <= self.status_code < 300True on 2xx. Note: receivers MUST return 2xx on duplicates too, so a 200 with
duplicate=truein the body is stillok. var response_body : bytesvar response_headers : Mapping[str, str]var sent_body : bytesvar sent_extra_headers : Mapping[str, str]var status_code : intvar url : str
class WebhookOutcome (rejected: bool = False,
rejection_reason: RejectionReason | None = None,
response_headers: Mapping[str, str] = <factory>,
sender_identity: str | None = None,
payload: WebhookPayload | None = None,
duplicate: bool = False,
idempotency_key: str | None = None)-
Expand source code
@dataclass(frozen=True) class WebhookOutcome: """Result of a single ``receive`` call. Exactly one of ``rejected`` or ``payload`` is set. ``duplicate=True`` is compatible with a non-None ``payload`` — the payload parsed fine, the signature verified fine, it's just a retry the caller should 200 away. """ rejected: bool = False rejection_reason: RejectionReason | None = None response_headers: Mapping[str, str] = field(default_factory=dict) # Populated on successful verify (even when rejected downstream of crypto) sender_identity: str | None = None # Populated on successful verify + parse payload: WebhookPayload | None = None duplicate: bool = False idempotency_key: str | None = NoneResult of a single
receivecall.Exactly one of
rejectedorpayloadis set.duplicate=Trueis compatible with a non-Nonepayload— the payload parsed fine, the signature verified fine, it's just a retry the caller should 200 away.Instance variables
var duplicate : boolvar idempotency_key : str | Nonevar payload : McpWebhookPayload | RevocationNotification | CollectionListChangedWebhook | PropertyListChangedWebhook | ArtifactWebhookPayload | Nonevar rejected : boolvar rejection_reason : Literal['signature_missing', 'signature_invalid', 'signature_legacy_failed', 'content_type_invalid', 'body_invalid_json', 'payload_invalid', 'idempotency_key_missing', 'idempotency_key_invalid'] | Nonevar response_headers : Mapping[str, str]var sender_identity : str | None
class WebhookReceiver (config: WebhookReceiverConfig)-
Expand source code
class WebhookReceiver: """Stateless webhook entry point, one instance per receiver configuration. Instance state (``config``) is read-only after construction. Per-request state lives in the :class:`WebhookOutcome` returned from :meth:`receive`. """ def __init__(self, config: WebhookReceiverConfig) -> None: self._config = config async def receive( self, *, method: str, url: str, headers: Mapping[str, str], body: bytes, ) -> WebhookOutcome: """Verify, dedupe, parse. Returns a :class:`WebhookOutcome`. Never raises for sender-caused cryptographic or protocol failures — returns an outcome with ``rejected=True`` and populated ``response_headers`` so the caller can convert to an HTTP response without try/except around every call. Operational failures inside the dedup backend or verify-options factory MAY still raise; wrap the call if you need to 5xx cleanly on internal errors. """ if not _content_type_is_json(headers): return _reject("content_type_invalid", sender_identity=None) signer, rejection = await self._verify(method=method, url=url, headers=headers, body=body) if rejection is not None: return rejection assert signer is not None # verification succeeded sender_id = signer.as_sender_identity() try: payload_dict = json.loads(body) except json.JSONDecodeError: return _reject("body_invalid_json", sender_identity=sender_id) if not isinstance(payload_dict, dict): return _reject("body_invalid_json", sender_identity=sender_id) idempotency_key = payload_dict.get("idempotency_key") if not isinstance(idempotency_key, str) or not idempotency_key: # Spec 3.0-rc: idempotency_key is REQUIRED on every webhook payload. return _reject("idempotency_key_missing", sender_identity=sender_id) if not _IDEMPOTENCY_KEY_RE.match(idempotency_key): # Non-conformant format — charset or length out of bounds. return _reject("idempotency_key_invalid", sender_identity=sender_id) parsed = self._parse(payload_dict) if parsed is None: return _reject("payload_invalid", sender_identity=sender_id) is_first_seen = await self._config.dedup.check_and_record( sender_id=sender_id, idempotency_key=idempotency_key ) return WebhookOutcome( sender_identity=sender_id, payload=parsed, duplicate=not is_first_seen, idempotency_key=idempotency_key, ) def receive_sync( self, *, method: str, url: str, headers: Mapping[str, str], body: bytes, ) -> WebhookOutcome: """Synchronous wrapper around :meth:`receive` for WSGI-style frameworks. Use this from Flask, Gunicorn sync workers, ``http.server``, or any other sync-only HTTP entry point where wrapping every call in ``asyncio.run(...)`` is just noise:: @app.post("/webhooks/adcp") def hook(): outcome = receiver.receive_sync( method=request.method, url=request.url, headers=dict(request.headers), body=request.get_data(), ) ... Raises :class:`RuntimeError` if invoked from a thread that already has a running event loop — the underlying verify / dedup path is async and cannot be driven from inside an active loop without blocking it. From async code, call :meth:`receive` directly. """ try: asyncio.get_running_loop() except RuntimeError: # No running loop in this thread — safe to spin one up. return asyncio.run(self.receive(method=method, url=url, headers=headers, body=body)) raise RuntimeError( "WebhookReceiver.receive_sync() cannot be called from a running " "event loop. Use `await receiver.receive(...)` instead." ) async def _verify( self, *, method: str, url: str, headers: Mapping[str, str], body: bytes, ) -> tuple[VerifiedSignerLike | None, WebhookOutcome | None]: """Returns (signer, None) on success or (None, rejection_outcome).""" has_9421 = _has_9421_headers(headers) if has_9421: try: signer = verify_webhook_signature( method=method, url=url, headers=headers, body=body, options=self._config.verify_options, ) return signer, None except SignatureVerificationError as exc: # Downgrade defense: when 9421 IS present but fails, do NOT # consult HMAC fallback by default. A MITM that stripped a # valid 9421 signature and replaced it with a forged HMAC one # is exactly what the downgrade guard exists for. fallback = self._config.legacy_hmac allow_hmac = fallback is not None and not fallback.only_when_9421_absent if not allow_hmac: return None, WebhookOutcome( rejected=True, rejection_reason="signature_invalid", response_headers=_www_authenticate_header(exc.code), ) logger.warning( "9421 webhook verify failed (%s); trying HMAC legacy because " "legacy_hmac.only_when_9421_absent=False is set", exc.code, ) fallback = self._config.legacy_hmac if fallback is None: # No 9421 headers AND no HMAC fallback configured → spec says 9421 # is baseline-required in 3.0, so this is non-conformant. return None, WebhookOutcome( rejected=True, rejection_reason="signature_missing", response_headers=_www_authenticate_header("webhook_signature_required"), ) hmac_options = fallback.options_for(headers) if hmac_options is None: return None, WebhookOutcome( rejected=True, rejection_reason="signature_missing", response_headers=_www_authenticate_header("webhook_signature_required"), ) try: legacy_signer = verify_webhook_hmac(headers=headers, body=body, options=hmac_options) return legacy_signer, None except LegacyWebhookHmacError: return None, WebhookOutcome( rejected=True, rejection_reason="signature_legacy_failed", response_headers=_www_authenticate_header("webhook_signature_invalid"), ) def _parse(self, payload_dict: dict[str, Any]) -> WebhookPayload | None: model = _MODEL_BY_KIND[self._config.kind] try: return cast(WebhookPayload, model.model_validate(payload_dict)) except ValidationError as exc: # Operators need the field-level reason to diagnose sender bugs. # The receiver still returns payload_invalid downstream; this is # just observability. logger.warning( "webhook payload failed %s validation: %s", self._config.kind, exc.errors(include_url=False), ) return NoneStateless webhook entry point, one instance per receiver configuration.
Instance state (
config) is read-only after construction. Per-request state lives in the :class:WebhookOutcomereturned from :meth:receive.Methods
async def receive(self, *, method: str, url: str, headers: Mapping[str, str], body: bytes) ‑> WebhookOutcome-
Expand source code
async def receive( self, *, method: str, url: str, headers: Mapping[str, str], body: bytes, ) -> WebhookOutcome: """Verify, dedupe, parse. Returns a :class:`WebhookOutcome`. Never raises for sender-caused cryptographic or protocol failures — returns an outcome with ``rejected=True`` and populated ``response_headers`` so the caller can convert to an HTTP response without try/except around every call. Operational failures inside the dedup backend or verify-options factory MAY still raise; wrap the call if you need to 5xx cleanly on internal errors. """ if not _content_type_is_json(headers): return _reject("content_type_invalid", sender_identity=None) signer, rejection = await self._verify(method=method, url=url, headers=headers, body=body) if rejection is not None: return rejection assert signer is not None # verification succeeded sender_id = signer.as_sender_identity() try: payload_dict = json.loads(body) except json.JSONDecodeError: return _reject("body_invalid_json", sender_identity=sender_id) if not isinstance(payload_dict, dict): return _reject("body_invalid_json", sender_identity=sender_id) idempotency_key = payload_dict.get("idempotency_key") if not isinstance(idempotency_key, str) or not idempotency_key: # Spec 3.0-rc: idempotency_key is REQUIRED on every webhook payload. return _reject("idempotency_key_missing", sender_identity=sender_id) if not _IDEMPOTENCY_KEY_RE.match(idempotency_key): # Non-conformant format — charset or length out of bounds. return _reject("idempotency_key_invalid", sender_identity=sender_id) parsed = self._parse(payload_dict) if parsed is None: return _reject("payload_invalid", sender_identity=sender_id) is_first_seen = await self._config.dedup.check_and_record( sender_id=sender_id, idempotency_key=idempotency_key ) return WebhookOutcome( sender_identity=sender_id, payload=parsed, duplicate=not is_first_seen, idempotency_key=idempotency_key, )Verify, dedupe, parse. Returns a :class:
WebhookOutcome.Never raises for sender-caused cryptographic or protocol failures — returns an outcome with
rejected=Trueand populatedresponse_headersso the caller can convert to an HTTP response without try/except around every call. Operational failures inside the dedup backend or verify-options factory MAY still raise; wrap the call if you need to 5xx cleanly on internal errors. def receive_sync(self, *, method: str, url: str, headers: Mapping[str, str], body: bytes) ‑> WebhookOutcome-
Expand source code
def receive_sync( self, *, method: str, url: str, headers: Mapping[str, str], body: bytes, ) -> WebhookOutcome: """Synchronous wrapper around :meth:`receive` for WSGI-style frameworks. Use this from Flask, Gunicorn sync workers, ``http.server``, or any other sync-only HTTP entry point where wrapping every call in ``asyncio.run(...)`` is just noise:: @app.post("/webhooks/adcp") def hook(): outcome = receiver.receive_sync( method=request.method, url=request.url, headers=dict(request.headers), body=request.get_data(), ) ... Raises :class:`RuntimeError` if invoked from a thread that already has a running event loop — the underlying verify / dedup path is async and cannot be driven from inside an active loop without blocking it. From async code, call :meth:`receive` directly. """ try: asyncio.get_running_loop() except RuntimeError: # No running loop in this thread — safe to spin one up. return asyncio.run(self.receive(method=method, url=url, headers=headers, body=body)) raise RuntimeError( "WebhookReceiver.receive_sync() cannot be called from a running " "event loop. Use `await receiver.receive(...)` instead." )Synchronous wrapper around :meth:
receivefor WSGI-style frameworks.Use this from Flask, Gunicorn sync workers,
http.server, or any other sync-only HTTP entry point where wrapping every call inasyncio.run(…)is just noise::@app.post("/webhooks/adcp") def hook(): outcome = receiver.receive_sync( method=request.method, url=request.url, headers=dict(request.headers), body=request.get_data(), ) ...Raises :class:
RuntimeErrorif invoked from a thread that already has a running event loop — the underlying verify / dedup path is async and cannot be driven from inside an active loop without blocking it. From async code, call :meth:receivedirectly.
class WebhookReceiverConfig (verify_options: WebhookVerifyOptions,
dedup: WebhookDedupStore,
legacy_hmac: LegacyHmacFallback | None = None,
kind: WebhookKind = 'mcp')-
Expand source code
@dataclass(frozen=True) class WebhookReceiverConfig: """Configuration bundle. :param verify_options: verifier configuration (JWKS, replay store, etc.). A single instance is reused for every request — the verifier stamps ``now`` itself via ``verify_options.clock()``, so there's no need to refresh a time field per request. :param dedup: webhook-dedup store. :param legacy_hmac: optional HMAC-SHA256 fallback for 3.x migration. :param kind: which webhook payload type to parse into. Default ``"mcp"`` (the task-status webhook that dominates most integrations); pass explicitly for list-change / artifact / revocation receivers. """ verify_options: WebhookVerifyOptions dedup: WebhookDedupStore legacy_hmac: LegacyHmacFallback | None = None kind: WebhookKind = "mcp"Configuration bundle.
:param verify_options: verifier configuration (JWKS, replay store, etc.). A single instance is reused for every request — the verifier stamps
nowitself viaverify_options.clock(), so there's no need to refresh a time field per request. :param dedup: webhook-dedup store. :param legacy_hmac: optional HMAC-SHA256 fallback for 3.x migration. :param kind: which webhook payload type to parse into. Default"mcp"(the task-status webhook that dominates most integrations); pass explicitly for list-change / artifact / revocation receivers.Instance variables
var dedup : WebhookDedupStorevar kind : Literal['mcp', 'revocation_notification', 'collection_list_changed', 'property_list_changed', 'artifact']var legacy_hmac : LegacyHmacFallback | Nonevar verify_options : WebhookVerifyOptions
class WebhookSender (*,
private_key: PrivateKey,
key_id: str,
alg: str,
client: httpx.AsyncClient | None = None,
timeout_seconds: float = 10.0)-
Expand source code
class WebhookSender: """Outbound signed-webhook delivery client. Owns one webhook-signing private key. Reuses a single :class:`httpx.AsyncClient` across requests for connection pooling — pass your own via ``client=`` if you want to share it with other SDK surfaces. Thread/task safety: safe to call concurrent ``send_*`` from many asyncio tasks. The underlying ``httpx.AsyncClient`` manages its own pool. """ def __init__( self, *, private_key: PrivateKey, key_id: str, alg: str, client: httpx.AsyncClient | None = None, timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS, ) -> None: self._private_key = private_key self._key_id = key_id self._alg = alg self._timeout = timeout_seconds self._client = client self._owns_client = client is None @classmethod def from_jwk( cls, jwk: Mapping[str, Any], *, d_field: str = "d", client: httpx.AsyncClient | None = None, timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS, ) -> WebhookSender: """Construct from a JWK that includes the private scalar. The JWK MUST have ``adcp_use == "webhook-signing"`` — the sender doesn't validate this (you're signing with your own key; validation happens at the receiver), but a key whose adcp_use is wrong will be rejected by every conformant verifier. """ # Snapshot the mapping once — a live Mapping could otherwise return # different values across the adcp_use / kid / d / alg reads. jwk_snapshot = dict(jwk) if jwk_snapshot.get("adcp_use") != "webhook-signing": raise ValueError( f"WebhookSender requires a JWK with adcp_use='webhook-signing' " f"(got {jwk_snapshot.get('adcp_use')!r}). Webhook-signing and " f"request-signing keys MUST be distinct so a signature from one " f"surface cannot be replayed as the other. Generate a separate " f"key with adcp_use='webhook-signing' and publish it in your " f"adagents.json alongside your request-signing key. See " f"https://adcontextprotocol.org/docs/building/implementation/security" ) alg = jwk_snapshot.get("alg") if alg == "EdDSA": alg = "ed25519" elif alg == "ES256": alg = "ecdsa-p256-sha256" if alg not in ("ed25519", "ecdsa-p256-sha256"): raise ValueError(f"unsupported JWK alg {jwk_snapshot.get('alg')!r}") private_key = private_key_from_jwk(jwk_snapshot, d_field=d_field) return cls( private_key=private_key, key_id=str(jwk_snapshot["kid"]), alg=alg, client=client, timeout_seconds=timeout_seconds, ) @classmethod def from_pem( cls, pem_path: str | Path | bytes, *, key_id: str, alg: str = "ed25519", passphrase: bytes | None = None, client: httpx.AsyncClient | None = None, timeout_seconds: float = _DEFAULT_TIMEOUT_SECONDS, ) -> WebhookSender: """Load a private key from a PEM file and bind it as a webhook sender. Companion to ``adcp-keygen --purpose webhook-signing``, which writes the PEM and prints the public JWK. The JWK is published at your ``jwks_uri``; the PEM holds the private key material. ``from_pem`` reads the PEM, constructs the right ``PrivateKey`` type for ``alg``, and returns a sender ready to send. Args: pem_path: Path to the PKCS#8 PEM, or the PEM bytes directly. key_id: JWK ``kid`` claim — must match the published JWK. alg: Signature algorithm. ``ed25519`` (default) or ``es256``. Also accepts the RFC 9421 form ``ecdsa-p256-sha256``. passphrase: Required if the PEM is encrypted (``adcp-keygen --encrypt``). client: Optional pre-built :class:`httpx.AsyncClient` to share across the SDK; the sender owns its own client when omitted. timeout_seconds: Per-request timeout for the owned client. Raises: ValueError: ``alg`` is not ed25519 / es256, or the PEM contains a key whose type doesn't match ``alg``. """ if alg in ("es256", "ES256"): alg = ALG_ES256 elif alg == "EdDSA": alg = ALG_ED25519 if alg not in (ALG_ED25519, ALG_ES256): raise ValueError( f"unsupported alg {alg!r} — use 'ed25519' or 'es256' " f"(the two AdCP webhook-signing algorithms)" ) if isinstance(pem_path, bytes): pem_bytes = pem_path else: pem_bytes = Path(pem_path).read_bytes() private_key = load_private_key_pem(pem_bytes, password=passphrase) # The PEM's key type must match the requested alg — mixing them # would produce signatures no verifier can validate, and the # resulting error at delivery time would point at the receiver. # Fail here so the misconfiguration surfaces at construction. if alg == ALG_ED25519 and not isinstance(private_key, ed25519.Ed25519PrivateKey): raise ValueError( f"PEM holds a {type(private_key).__name__} but alg='ed25519' " f"was requested. Re-run adcp-keygen with --alg ed25519, or " f"pass alg='es256' to match the existing PEM." ) if alg == ALG_ES256 and not isinstance(private_key, ec.EllipticCurvePrivateKey): raise ValueError( f"PEM holds a {type(private_key).__name__} but alg='es256' " f"was requested. Re-run adcp-keygen with --alg es256, or " f"pass alg='ed25519' to match the existing PEM." ) return cls( private_key=private_key, key_id=key_id, alg=alg, client=client, timeout_seconds=timeout_seconds, ) def __repr__(self) -> str: # Explicit repr so no future debug helper or error traceback auto- # renders self.__dict__ and pulls the private key into logs. return f"WebhookSender(key_id={self._key_id!r}, alg={self._alg!r})" async def aclose(self) -> None: """Close the internal httpx client if we own it.""" if self._owns_client and self._client is not None: await self._client.aclose() self._client = None async def __aenter__(self) -> WebhookSender: await self._get_client() return self async def __aexit__(self, *args: Any) -> None: await self.aclose() async def _get_client(self) -> httpx.AsyncClient: if self._client is None: self._client = httpx.AsyncClient(timeout=self._timeout) return self._client async def send_mcp( self, *, url: str, task_id: str, status: GeneratedTaskStatus | str, task_type: str | None = None, result: AdcpAsyncResponseData | dict[str, Any] | None = None, timestamp: datetime | None = None, operation_id: str | None = None, message: str | None = None, context_id: str | None = None, domain: str | None = None, idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed MCP-style task-status webhook. On retry, prefer :meth:`resend` over calling this again — ``resend`` replays the exact same bytes, whereas re-invoking ``send_mcp`` with the "same" args would produce a fresh ``timestamp`` and potentially a different serialized body, which the receiver would dedupe but with different observed payload data. """ payload = create_mcp_webhook_payload( task_id=task_id, status=status, task_type=task_type, result=result, timestamp=timestamp, operation_id=operation_id, message=message, context_id=context_id, domain=domain, idempotency_key=idempotency_key, ) return await self.send_raw( url=url, idempotency_key=str(payload["idempotency_key"]), payload=payload, extra_headers=extra_headers, ) async def send_revocation_notification( self, *, url: str, rights_id: str, brand_id: str, reason: str, effective_at: str, idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed rights-revocation notification.""" key = idempotency_key or generate_webhook_idempotency_key() payload: dict[str, Any] = { "idempotency_key": key, "rights_id": rights_id, "brand_id": brand_id, "reason": reason, "effective_at": effective_at, } return await self.send_raw( url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers ) async def send_artifact_webhook( self, *, url: str, media_buy_id: str, batch_id: str, timestamp: str, artifacts: list[dict[str, Any]], idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed content-standards artifact webhook.""" key = idempotency_key or generate_webhook_idempotency_key() payload: dict[str, Any] = { "idempotency_key": key, "media_buy_id": media_buy_id, "batch_id": batch_id, "timestamp": timestamp, "artifacts": artifacts, } return await self.send_raw( url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers ) async def send_collection_list_changed( self, *, url: str, list_id: str, resolved_at: str, signature: str, idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed governance collection-list-changed webhook. ``signature`` is the payload-level signature field that predates 9421 webhook transport signing — it remains required by the schema. The 9421 signature this method adds protects the transport envelope. """ key = idempotency_key or generate_webhook_idempotency_key() payload: dict[str, Any] = { "idempotency_key": key, "event": "collection_list_changed", "list_id": list_id, "resolved_at": resolved_at, "signature": signature, } return await self.send_raw( url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers ) async def send_property_list_changed( self, *, url: str, list_id: str, resolved_at: str, signature: str, idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed governance property-list-changed webhook.""" key = idempotency_key or generate_webhook_idempotency_key() payload: dict[str, Any] = { "idempotency_key": key, "event": "property_list_changed", "list_id": list_id, "resolved_at": resolved_at, "signature": signature, } return await self.send_raw( url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers ) async def send_raw( self, *, url: str, idempotency_key: str, payload: dict[str, Any], extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """Low-level escape hatch: sign + POST an arbitrary payload. The ``idempotency_key`` kwarg is required and is injected into the payload before signing — the visible signature makes the contract impossible to forget, unlike a runtime dict check. If ``payload`` already carries an ``idempotency_key``, the kwarg wins so the two cannot disagree. """ if not isinstance(idempotency_key, str) or not idempotency_key: raise ValueError("idempotency_key must be a non-empty string") body_dict = {**payload, "idempotency_key": idempotency_key} # Byte-exact serialization — this is the ONLY representation that # gets signed AND posted. Do not allow an httpx `json=` path anywhere # in the stack because it would reserialize and break the digest. body = json.dumps(body_dict).encode("utf-8") if len(body) > _MAX_BODY_BYTES: raise ValueError( f"serialized webhook body is {len(body):,} bytes, over the " f"{_MAX_BODY_BYTES:,}-byte cap. Split into smaller webhooks " "or use batch-reporting endpoints." ) return await self._send_bytes( url=url, body=body, idempotency_key=idempotency_key, extra_headers=extra_headers, ) async def resend(self, result: WebhookDeliveryResult) -> WebhookDeliveryResult: """Replay an earlier delivery under a fresh signature. The bytes are identical (same ``idempotency_key``, same payload fields, same serialization) — only the Signature / Signature-Input / Content-Digest headers are regenerated. The receiver dedupes via ``idempotency_key``, so the replayed event is a spec-correct retry that won't cause double-processing. """ if not result.sent_body: raise ValueError( "cannot resend: result has no captured sent_body (likely constructed " "externally). Call a send_* method on this sender first." ) return await self._send_bytes( url=result.url, body=result.sent_body, idempotency_key=result.idempotency_key, extra_headers=result.sent_extra_headers or None, ) async def _send_bytes( self, *, url: str, body: bytes, idempotency_key: str, extra_headers: Mapping[str, str] | None, ) -> WebhookDeliveryResult: """Sign + POST a pre-serialized body. Shared by send_raw and resend.""" base_headers = {"Content-Type": "application/json"} signed = sign_webhook( method="POST", url=url, headers=base_headers, body=body, private_key=self._private_key, key_id=self._key_id, alg=self._alg, ) headers: dict[str, str] = {**base_headers, **signed.as_dict()} if extra_headers: # Pre-scan so a bad extra_header doesn't leave half-merged state. reserved = {"signature", "signature-input", "content-digest", "content-type"} for k in extra_headers: if str(k).lower() in reserved: raise ValueError( f"extra_headers may not override signature-binding or " f"content-type header {k!r}" ) for k, v in extra_headers.items(): headers[k] = v client = await self._get_client() response = await client.post(url, content=body, headers=headers) return WebhookDeliveryResult( status_code=response.status_code, idempotency_key=idempotency_key, url=url, response_headers=dict(response.headers), response_body=response.content, sent_body=body, sent_extra_headers=dict(extra_headers) if extra_headers else {}, )Outbound signed-webhook delivery client.
Owns one webhook-signing private key. Reuses a single :class:
httpx.AsyncClientacross requests for connection pooling — pass your own viaclient=if you want to share it with other SDK surfaces.Thread/task safety: safe to call concurrent
send_*from many asyncio tasks. The underlyinghttpx.AsyncClientmanages its own pool.Static methods
def from_jwk(jwk: Mapping[str, Any],
*,
d_field: str = 'd',
client: httpx.AsyncClient | None = None,
timeout_seconds: float = 10.0) ‑> WebhookSender-
Construct from a JWK that includes the private scalar.
The JWK MUST have
adcp_use == "webhook-signing"— the sender doesn't validate this (you're signing with your own key; validation happens at the receiver), but a key whose adcp_use is wrong will be rejected by every conformant verifier. def from_pem(pem_path: str | Path | bytes,
*,
key_id: str,
alg: str = 'ed25519',
passphrase: bytes | None = None,
client: httpx.AsyncClient | None = None,
timeout_seconds: float = 10.0) ‑> WebhookSender-
Load a private key from a PEM file and bind it as a webhook sender.
Companion to
adcp-keygen --purpose webhook-signing, which writes the PEM and prints the public JWK. The JWK is published at yourjwks_uri; the PEM holds the private key material.from_pemreads the PEM, constructs the rightPrivateKeytype foralg, and returns a sender ready to send.Args
pem_path- Path to the PKCS#8 PEM, or the PEM bytes directly.
key_id- JWK
kidclaim — must match the published JWK. alg- Signature algorithm.
ed25519(default) ores256. Also accepts the RFC 9421 formecdsa-p256-sha256. passphrase- Required if the PEM is encrypted
(
adcp-keygen --encrypt). client- Optional pre-built :class:
httpx.AsyncClientto share across the SDK; the sender owns its own client when omitted. timeout_seconds- Per-request timeout for the owned client.
Raises
ValueErroralgis not ed25519 / es256, or the PEM contains a key whose type doesn't matchalg.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Close the internal httpx client if we own it.""" if self._owns_client and self._client is not None: await self._client.aclose() self._client = NoneClose the internal httpx client if we own it.
async def resend(self,
result: WebhookDeliveryResult) ‑> WebhookDeliveryResult-
Expand source code
async def resend(self, result: WebhookDeliveryResult) -> WebhookDeliveryResult: """Replay an earlier delivery under a fresh signature. The bytes are identical (same ``idempotency_key``, same payload fields, same serialization) — only the Signature / Signature-Input / Content-Digest headers are regenerated. The receiver dedupes via ``idempotency_key``, so the replayed event is a spec-correct retry that won't cause double-processing. """ if not result.sent_body: raise ValueError( "cannot resend: result has no captured sent_body (likely constructed " "externally). Call a send_* method on this sender first." ) return await self._send_bytes( url=result.url, body=result.sent_body, idempotency_key=result.idempotency_key, extra_headers=result.sent_extra_headers or None, )Replay an earlier delivery under a fresh signature.
The bytes are identical (same
idempotency_key, same payload fields, same serialization) — only the Signature / Signature-Input / Content-Digest headers are regenerated. The receiver dedupes viaidempotency_key, so the replayed event is a spec-correct retry that won't cause double-processing. async def send_artifact_webhook(self,
*,
url: str,
media_buy_id: str,
batch_id: str,
timestamp: str,
artifacts: list[dict[str, Any]],
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult-
Expand source code
async def send_artifact_webhook( self, *, url: str, media_buy_id: str, batch_id: str, timestamp: str, artifacts: list[dict[str, Any]], idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed content-standards artifact webhook.""" key = idempotency_key or generate_webhook_idempotency_key() payload: dict[str, Any] = { "idempotency_key": key, "media_buy_id": media_buy_id, "batch_id": batch_id, "timestamp": timestamp, "artifacts": artifacts, } return await self.send_raw( url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers )POST a signed content-standards artifact webhook.
async def send_collection_list_changed(self,
*,
url: str,
list_id: str,
resolved_at: str,
signature: str,
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult-
Expand source code
async def send_collection_list_changed( self, *, url: str, list_id: str, resolved_at: str, signature: str, idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed governance collection-list-changed webhook. ``signature`` is the payload-level signature field that predates 9421 webhook transport signing — it remains required by the schema. The 9421 signature this method adds protects the transport envelope. """ key = idempotency_key or generate_webhook_idempotency_key() payload: dict[str, Any] = { "idempotency_key": key, "event": "collection_list_changed", "list_id": list_id, "resolved_at": resolved_at, "signature": signature, } return await self.send_raw( url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers )POST a signed governance collection-list-changed webhook.
signatureis the payload-level signature field that predates 9421 webhook transport signing — it remains required by the schema. The 9421 signature this method adds protects the transport envelope. async def send_mcp(self,
*,
url: str,
task_id: str,
status: GeneratedTaskStatus | str,
task_type: str | None = None,
result: AdcpAsyncResponseData | dict[str, Any] | None = None,
timestamp: datetime | None = None,
operation_id: str | None = None,
message: str | None = None,
context_id: str | None = None,
domain: str | None = None,
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult-
Expand source code
async def send_mcp( self, *, url: str, task_id: str, status: GeneratedTaskStatus | str, task_type: str | None = None, result: AdcpAsyncResponseData | dict[str, Any] | None = None, timestamp: datetime | None = None, operation_id: str | None = None, message: str | None = None, context_id: str | None = None, domain: str | None = None, idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed MCP-style task-status webhook. On retry, prefer :meth:`resend` over calling this again — ``resend`` replays the exact same bytes, whereas re-invoking ``send_mcp`` with the "same" args would produce a fresh ``timestamp`` and potentially a different serialized body, which the receiver would dedupe but with different observed payload data. """ payload = create_mcp_webhook_payload( task_id=task_id, status=status, task_type=task_type, result=result, timestamp=timestamp, operation_id=operation_id, message=message, context_id=context_id, domain=domain, idempotency_key=idempotency_key, ) return await self.send_raw( url=url, idempotency_key=str(payload["idempotency_key"]), payload=payload, extra_headers=extra_headers, )POST a signed MCP-style task-status webhook.
On retry, prefer :meth:
resendover calling this again —resendreplays the exact same bytes, whereas re-invokingsend_mcpwith the "same" args would produce a freshtimestampand potentially a different serialized body, which the receiver would dedupe but with different observed payload data. async def send_property_list_changed(self,
*,
url: str,
list_id: str,
resolved_at: str,
signature: str,
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult-
Expand source code
async def send_property_list_changed( self, *, url: str, list_id: str, resolved_at: str, signature: str, idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed governance property-list-changed webhook.""" key = idempotency_key or generate_webhook_idempotency_key() payload: dict[str, Any] = { "idempotency_key": key, "event": "property_list_changed", "list_id": list_id, "resolved_at": resolved_at, "signature": signature, } return await self.send_raw( url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers )POST a signed governance property-list-changed webhook.
async def send_raw(self,
*,
url: str,
idempotency_key: str,
payload: dict[str, Any],
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult-
Expand source code
async def send_raw( self, *, url: str, idempotency_key: str, payload: dict[str, Any], extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """Low-level escape hatch: sign + POST an arbitrary payload. The ``idempotency_key`` kwarg is required and is injected into the payload before signing — the visible signature makes the contract impossible to forget, unlike a runtime dict check. If ``payload`` already carries an ``idempotency_key``, the kwarg wins so the two cannot disagree. """ if not isinstance(idempotency_key, str) or not idempotency_key: raise ValueError("idempotency_key must be a non-empty string") body_dict = {**payload, "idempotency_key": idempotency_key} # Byte-exact serialization — this is the ONLY representation that # gets signed AND posted. Do not allow an httpx `json=` path anywhere # in the stack because it would reserialize and break the digest. body = json.dumps(body_dict).encode("utf-8") if len(body) > _MAX_BODY_BYTES: raise ValueError( f"serialized webhook body is {len(body):,} bytes, over the " f"{_MAX_BODY_BYTES:,}-byte cap. Split into smaller webhooks " "or use batch-reporting endpoints." ) return await self._send_bytes( url=url, body=body, idempotency_key=idempotency_key, extra_headers=extra_headers, )Low-level escape hatch: sign + POST an arbitrary payload.
The
idempotency_keykwarg is required and is injected into the payload before signing — the visible signature makes the contract impossible to forget, unlike a runtime dict check. Ifpayloadalready carries anidempotency_key, the kwarg wins so the two cannot disagree. async def send_revocation_notification(self,
*,
url: str,
rights_id: str,
brand_id: str,
reason: str,
effective_at: str,
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None) ‑> WebhookDeliveryResult-
Expand source code
async def send_revocation_notification( self, *, url: str, rights_id: str, brand_id: str, reason: str, effective_at: str, idempotency_key: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed rights-revocation notification.""" key = idempotency_key or generate_webhook_idempotency_key() payload: dict[str, Any] = { "idempotency_key": key, "rights_id": rights_id, "brand_id": brand_id, "reason": reason, "effective_at": effective_at, } return await self.send_raw( url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers )POST a signed rights-revocation notification.
class WebhookVerifyOptions (*,
jwks_resolver: JwksResolver,
replay_store: ReplayStore | None = None,
revocation_checker: RevocationChecker | None = None,
revocation_list: RevocationList | None = None,
max_skew_seconds: int = 60,
max_window_seconds: int = 300,
label: str = 'sig1',
allowed_algs: frozenset[str] = frozenset({'ed25519', 'ecdsa-p256-sha256'}),
sender_url: str | None = None,
clock: Callable[[], float] = <built-in function time>)-
Expand source code
@dataclass(frozen=True, kw_only=True) class WebhookVerifyOptions: """Options for the webhook verifier. Subset of :class:`VerifyOptions` — several fields are pinned (tag, adcp_use, content-digest policy) because the webhook profile doesn't leave them as caller choices. Unlike the request verifier, there is no ``now`` field — the webhook verifier stamps time-of-check itself, so the same :class:`WebhookVerifyOptions` instance can live for the lifetime of your receiver without a factory closure around it. Override via ``clock=`` for deterministic tests. """ jwks_resolver: JwksResolver replay_store: ReplayStore | None = None revocation_checker: RevocationChecker | None = None revocation_list: RevocationList | None = None max_skew_seconds: int = DEFAULT_SKEW_SECONDS max_window_seconds: int = MAX_WINDOW_SECONDS label: str = SIG_LABEL_DEFAULT allowed_algs: frozenset[str] = ALLOWED_ALGS sender_url: str | None = None clock: Callable[[], float] = time.timeOptions for the webhook verifier.
Subset of :class:
VerifyOptions— several fields are pinned (tag, adcp_use, content-digest policy) because the webhook profile doesn't leave them as caller choices.Unlike the request verifier, there is no
nowfield — the webhook verifier stamps time-of-check itself, so the same :class:WebhookVerifyOptionsinstance can live for the lifetime of your receiver without a factory closure around it. Override viaclock=for deterministic tests.Instance variables
var allowed_algs : frozenset[str]var jwks_resolver : JwksResolvervar label : strvar max_skew_seconds : intvar max_window_seconds : intvar replay_store : ReplayStore | Nonevar revocation_checker : RevocationChecker | Nonevar revocation_list : RevocationList | Nonevar sender_url : str | None
Methods
def clock(...) ‑> Callable[[], float]-
time() -> floating point number
Return the current time in seconds since the Epoch. Fractions of a second may be present if the system clock provides them.