Module adcp.webhook_receiver

One-call webhook receiver: verify signature, dedupe, parse.

Packages the three things every AdCP webhook receiver has to do so callers don't re-read the five-point normative checklist in webhooks.mdx:

  1. Verify the RFC 9421 signature (or fall back to HMAC-SHA256 when the buyer explicitly opts in for 3.x migration).
  2. Dedupe on (authenticated_sender_identity, idempotency_key) with a 24h+ TTL. Duplicates are a no-op, not an error.
  3. Parse the body into the right typed payload so the caller gets a McpWebhookPayload / RevocationNotification / etc. back.

Usage::

from adcp.webhooks import (
    WebhookReceiver,
    WebhookReceiverConfig,
    WebhookVerifyOptions,
)
from adcp.server.idempotency import MemoryBackend, WebhookDedupStore

receiver = WebhookReceiver(
    config=WebhookReceiverConfig(
        verify_options=WebhookVerifyOptions(
            jwks_resolver=my_jwks_resolver,
            replay_store=my_replay_store,
        ),
        dedup=WebhookDedupStore(MemoryBackend(), ttl_seconds=86400),
    ),
)

@app.post("/webhooks/adcp")
async def hook(request):
    outcome = await receiver.receive(
        method=request.method,
        url=str(request.url),
        headers=dict(request.headers),
        body=await request.body(),
    )
    if outcome.rejected:
        return Response(status_code=401, headers=outcome.response_headers)
    # MUST return 2xx on duplicates — the sender interprets non-2xx as
    # delivery failure and retries.
    if outcome.duplicate:
        return Response(status_code=200)
    await process(outcome.payload)
    return Response(status_code=200)

Classes

class LegacyHmacFallback (options_for: Callable[[Mapping[str, str]], LegacyWebhookHmacOptions | None],
only_when_9421_absent: bool = True)
Expand source code
@dataclass(frozen=True)
class LegacyHmacFallback:
    """Opt-in policy for accepting HMAC-SHA256 senders during 3.x migration.

    The default behavior of the receiver is to reject any request that fails
    9421 verification. Pass an instance of this class to ``WebhookReceiverConfig``
    to accept HMAC-signed webhooks as a fallback.

    :param options_for: callback that returns a populated
        :class:`LegacyWebhookHmacOptions` given the incoming request headers.
        Your implementation resolves the sender (from Bearer, hostname, or
        legacy shared-secret tag) and returns the secret + sender_identity
        tuple the verifier needs. Return ``None`` to decline the fallback
        for this request (rejection follows the 9421-only failure path).
    :param only_when_9421_absent: when ``True`` (default), HMAC fallback only
        fires when no 9421 headers are present at all. When a request carries
        9421 headers that FAIL verification, it still rejects — preventing a
        downgrade attack where a MITM strips the 9421 signature and replaces
        it with a forged HMAC one it knows the secret for. When ``False``,
        HMAC is tried on any 9421 failure; only set this for testing or known
        homogenous sender cohorts.
    """

    options_for: Callable[[Mapping[str, str]], LegacyWebhookHmacOptions | None]
    only_when_9421_absent: bool = True

    @classmethod
    def from_shared_secret(
        cls,
        *,
        secret: bytes,
        sender_identity: str,
        only_when_9421_absent: bool = True,
        window_seconds: int = 300,
    ) -> LegacyHmacFallback:
        """Convenience constructor for the "one secret, one sender" case.

        Covers the common 3.x migration setup where the receiver has exactly
        one publisher on the legacy scheme and binds them to a known
        ``sender_identity`` (typically a buyer-defined string). For multi-
        sender or header-derived-identity setups, construct with an
        ``options_for`` callback directly.
        """
        import time as _time

        def _options_for(_headers: Mapping[str, str]) -> LegacyWebhookHmacOptions:
            return LegacyWebhookHmacOptions(
                secret=secret,
                sender_identity=sender_identity,
                now=_time.time(),
                window_seconds=window_seconds,
            )

        return cls(
            options_for=_options_for,
            only_when_9421_absent=only_when_9421_absent,
        )

Opt-in policy for accepting HMAC-SHA256 senders during 3.x migration.

The default behavior of the receiver is to reject any request that fails 9421 verification. Pass an instance of this class to WebhookReceiverConfig to accept HMAC-signed webhooks as a fallback.

:param options_for: callback that returns a populated :class:LegacyWebhookHmacOptions given the incoming request headers. Your implementation resolves the sender (from Bearer, hostname, or legacy shared-secret tag) and returns the secret + sender_identity tuple the verifier needs. Return None to decline the fallback for this request (rejection follows the 9421-only failure path). :param only_when_9421_absent: when True (default), HMAC fallback only fires when no 9421 headers are present at all. When a request carries 9421 headers that FAIL verification, it still rejects — preventing a downgrade attack where a MITM strips the 9421 signature and replaces it with a forged HMAC one it knows the secret for. When False, HMAC is tried on any 9421 failure; only set this for testing or known homogenous sender cohorts.

Static methods

def from_shared_secret(*,
secret: bytes,
sender_identity: str,
only_when_9421_absent: bool = True,
window_seconds: int = 300) ‑> LegacyHmacFallback

Convenience constructor for the "one secret, one sender" case.

Covers the common 3.x migration setup where the receiver has exactly one publisher on the legacy scheme and binds them to a known sender_identity (typically a buyer-defined string). For multi- sender or header-derived-identity setups, construct with an options_for callback directly.

Instance variables

var only_when_9421_absent : bool
var options_for : Callable[[Mapping[str, str]], LegacyWebhookHmacOptions | None]
class VerifiedSignerLike (*args, **kwargs)
Expand source code
@runtime_checkable
class VerifiedSignerLike(Protocol):
    """Anything with ``as_sender_identity() -> str``.

    Both :class:`VerifiedWebhookSender` (9421) and :class:`VerifiedLegacyWebhookSender`
    (HMAC) implement this shape, so the receiver treats both verification
    paths identically downstream.
    """

    def as_sender_identity(self) -> str: ...

Anything with as_sender_identity() -> str.

Both :class:VerifiedWebhookSender (9421) and :class:VerifiedLegacyWebhookSender (HMAC) implement this shape, so the receiver treats both verification paths identically downstream.

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

def as_sender_identity(self) ‑> str
Expand source code
def as_sender_identity(self) -> str: ...
class WebhookOutcome (rejected: bool = False,
rejection_reason: RejectionReason | None = None,
response_headers: Mapping[str, str] = <factory>,
sender_identity: str | None = None,
payload: WebhookPayload | None = None,
duplicate: bool = False,
idempotency_key: str | None = None)
Expand source code
@dataclass(frozen=True)
class WebhookOutcome:
    """Result of a single ``receive`` call.

    Exactly one of ``rejected`` or ``payload`` is set. ``duplicate=True`` is
    compatible with a non-None ``payload`` — the payload parsed fine, the
    signature verified fine, it's just a retry the caller should 200 away.
    """

    rejected: bool = False
    rejection_reason: RejectionReason | None = None
    response_headers: Mapping[str, str] = field(default_factory=dict)
    # Populated on successful verify (even when rejected downstream of crypto)
    sender_identity: str | None = None
    # Populated on successful verify + parse
    payload: WebhookPayload | None = None
    duplicate: bool = False
    idempotency_key: str | None = None

Result of a single receive call.

Exactly one of rejected or payload is set. duplicate=True is compatible with a non-None payload — the payload parsed fine, the signature verified fine, it's just a retry the caller should 200 away.

Instance variables

var duplicate : bool
var idempotency_key : str | None
var payloadMcpWebhookPayload | RevocationNotification | CollectionListChangedWebhook | PropertyListChangedWebhook | ArtifactWebhookPayload | None
var rejected : bool
var rejection_reason : Literal['signature_missing', 'signature_invalid', 'signature_legacy_failed', 'content_type_invalid', 'body_invalid_json', 'payload_invalid', 'idempotency_key_missing', 'idempotency_key_invalid'] | None
var response_headers : Mapping[str, str]
var sender_identity : str | None
class WebhookReceiver (config: WebhookReceiverConfig)
Expand source code
class WebhookReceiver:
    """Stateless webhook entry point, one instance per receiver configuration.

    Instance state (``config``) is read-only after construction. Per-request
    state lives in the :class:`WebhookOutcome` returned from :meth:`receive`.
    """

    def __init__(self, config: WebhookReceiverConfig) -> None:
        self._config = config

    async def receive(
        self,
        *,
        method: str,
        url: str,
        headers: Mapping[str, str],
        body: bytes,
    ) -> WebhookOutcome:
        """Verify, dedupe, parse. Returns a :class:`WebhookOutcome`.

        Never raises for sender-caused cryptographic or protocol failures —
        returns an outcome with ``rejected=True`` and populated
        ``response_headers`` so the caller can convert to an HTTP response
        without try/except around every call. Operational failures inside
        the dedup backend or verify-options factory MAY still raise; wrap
        the call if you need to 5xx cleanly on internal errors.
        """
        if not _content_type_is_json(headers):
            return _reject("content_type_invalid", sender_identity=None)

        signer, rejection = await self._verify(method=method, url=url, headers=headers, body=body)
        if rejection is not None:
            return rejection
        assert signer is not None  # verification succeeded

        sender_id = signer.as_sender_identity()

        try:
            payload_dict = json.loads(body)
        except json.JSONDecodeError:
            return _reject("body_invalid_json", sender_identity=sender_id)
        if not isinstance(payload_dict, dict):
            return _reject("body_invalid_json", sender_identity=sender_id)

        idempotency_key = payload_dict.get("idempotency_key")
        if not isinstance(idempotency_key, str) or not idempotency_key:
            # Spec 3.0-rc: idempotency_key is REQUIRED on every webhook payload.
            return _reject("idempotency_key_missing", sender_identity=sender_id)
        if not _IDEMPOTENCY_KEY_RE.match(idempotency_key):
            # Non-conformant format — charset or length out of bounds.
            return _reject("idempotency_key_invalid", sender_identity=sender_id)

        parsed = self._parse(payload_dict)
        if parsed is None:
            return _reject("payload_invalid", sender_identity=sender_id)

        is_first_seen = await self._config.dedup.check_and_record(
            sender_id=sender_id, idempotency_key=idempotency_key
        )

        return WebhookOutcome(
            sender_identity=sender_id,
            payload=parsed,
            duplicate=not is_first_seen,
            idempotency_key=idempotency_key,
        )

    def receive_sync(
        self,
        *,
        method: str,
        url: str,
        headers: Mapping[str, str],
        body: bytes,
    ) -> WebhookOutcome:
        """Synchronous wrapper around :meth:`receive` for WSGI-style frameworks.

        Use this from Flask, Gunicorn sync workers, ``http.server``, or any
        other sync-only HTTP entry point where wrapping every call in
        ``asyncio.run(...)`` is just noise::

            @app.post("/webhooks/adcp")
            def hook():
                outcome = receiver.receive_sync(
                    method=request.method,
                    url=request.url,
                    headers=dict(request.headers),
                    body=request.get_data(),
                )
                ...

        Raises :class:`RuntimeError` if invoked from a thread that already has
        a running event loop — the underlying verify / dedup path is async and
        cannot be driven from inside an active loop without blocking it. From
        async code, call :meth:`receive` directly.
        """
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            # No running loop in this thread — safe to spin one up.
            return asyncio.run(self.receive(method=method, url=url, headers=headers, body=body))
        raise RuntimeError(
            "WebhookReceiver.receive_sync() cannot be called from a running "
            "event loop. Use `await receiver.receive(...)` instead."
        )

    async def _verify(
        self,
        *,
        method: str,
        url: str,
        headers: Mapping[str, str],
        body: bytes,
    ) -> tuple[VerifiedSignerLike | None, WebhookOutcome | None]:
        """Returns (signer, None) on success or (None, rejection_outcome)."""
        has_9421 = _has_9421_headers(headers)

        if has_9421:
            try:
                signer = verify_webhook_signature(
                    method=method,
                    url=url,
                    headers=headers,
                    body=body,
                    options=self._config.verify_options,
                )
                return signer, None
            except SignatureVerificationError as exc:
                # Downgrade defense: when 9421 IS present but fails, do NOT
                # consult HMAC fallback by default. A MITM that stripped a
                # valid 9421 signature and replaced it with a forged HMAC one
                # is exactly what the downgrade guard exists for.
                fallback = self._config.legacy_hmac
                allow_hmac = fallback is not None and not fallback.only_when_9421_absent
                if not allow_hmac:
                    return None, WebhookOutcome(
                        rejected=True,
                        rejection_reason="signature_invalid",
                        response_headers=_www_authenticate_header(exc.code),
                    )
                logger.warning(
                    "9421 webhook verify failed (%s); trying HMAC legacy because "
                    "legacy_hmac.only_when_9421_absent=False is set",
                    exc.code,
                )

        fallback = self._config.legacy_hmac
        if fallback is None:
            # No 9421 headers AND no HMAC fallback configured → spec says 9421
            # is baseline-required in 3.0, so this is non-conformant.
            return None, WebhookOutcome(
                rejected=True,
                rejection_reason="signature_missing",
                response_headers=_www_authenticate_header("webhook_signature_required"),
            )

        hmac_options = fallback.options_for(headers)
        if hmac_options is None:
            return None, WebhookOutcome(
                rejected=True,
                rejection_reason="signature_missing",
                response_headers=_www_authenticate_header("webhook_signature_required"),
            )
        try:
            legacy_signer = verify_webhook_hmac(headers=headers, body=body, options=hmac_options)
            return legacy_signer, None
        except LegacyWebhookHmacError:
            return None, WebhookOutcome(
                rejected=True,
                rejection_reason="signature_legacy_failed",
                response_headers=_www_authenticate_header("webhook_signature_invalid"),
            )

    def _parse(self, payload_dict: dict[str, Any]) -> WebhookPayload | None:
        model = _MODEL_BY_KIND[self._config.kind]
        try:
            return cast(WebhookPayload, model.model_validate(payload_dict))
        except ValidationError as exc:
            # Operators need the field-level reason to diagnose sender bugs.
            # The receiver still returns payload_invalid downstream; this is
            # just observability.
            logger.warning(
                "webhook payload failed %s validation: %s",
                self._config.kind,
                exc.errors(include_url=False),
            )
            return None

Stateless webhook entry point, one instance per receiver configuration.

Instance state (config) is read-only after construction. Per-request state lives in the :class:WebhookOutcome returned from :meth:receive.

Methods

async def receive(self, *, method: str, url: str, headers: Mapping[str, str], body: bytes) ‑> WebhookOutcome
Expand source code
async def receive(
    self,
    *,
    method: str,
    url: str,
    headers: Mapping[str, str],
    body: bytes,
) -> WebhookOutcome:
    """Verify, dedupe, parse. Returns a :class:`WebhookOutcome`.

    Never raises for sender-caused cryptographic or protocol failures —
    returns an outcome with ``rejected=True`` and populated
    ``response_headers`` so the caller can convert to an HTTP response
    without try/except around every call. Operational failures inside
    the dedup backend or verify-options factory MAY still raise; wrap
    the call if you need to 5xx cleanly on internal errors.
    """
    if not _content_type_is_json(headers):
        return _reject("content_type_invalid", sender_identity=None)

    signer, rejection = await self._verify(method=method, url=url, headers=headers, body=body)
    if rejection is not None:
        return rejection
    assert signer is not None  # verification succeeded

    sender_id = signer.as_sender_identity()

    try:
        payload_dict = json.loads(body)
    except json.JSONDecodeError:
        return _reject("body_invalid_json", sender_identity=sender_id)
    if not isinstance(payload_dict, dict):
        return _reject("body_invalid_json", sender_identity=sender_id)

    idempotency_key = payload_dict.get("idempotency_key")
    if not isinstance(idempotency_key, str) or not idempotency_key:
        # Spec 3.0-rc: idempotency_key is REQUIRED on every webhook payload.
        return _reject("idempotency_key_missing", sender_identity=sender_id)
    if not _IDEMPOTENCY_KEY_RE.match(idempotency_key):
        # Non-conformant format — charset or length out of bounds.
        return _reject("idempotency_key_invalid", sender_identity=sender_id)

    parsed = self._parse(payload_dict)
    if parsed is None:
        return _reject("payload_invalid", sender_identity=sender_id)

    is_first_seen = await self._config.dedup.check_and_record(
        sender_id=sender_id, idempotency_key=idempotency_key
    )

    return WebhookOutcome(
        sender_identity=sender_id,
        payload=parsed,
        duplicate=not is_first_seen,
        idempotency_key=idempotency_key,
    )

Verify, dedupe, parse. Returns a :class:WebhookOutcome.

Never raises for sender-caused cryptographic or protocol failures — returns an outcome with rejected=True and populated response_headers so the caller can convert to an HTTP response without try/except around every call. Operational failures inside the dedup backend or verify-options factory MAY still raise; wrap the call if you need to 5xx cleanly on internal errors.

def receive_sync(self, *, method: str, url: str, headers: Mapping[str, str], body: bytes) ‑> WebhookOutcome
Expand source code
def receive_sync(
    self,
    *,
    method: str,
    url: str,
    headers: Mapping[str, str],
    body: bytes,
) -> WebhookOutcome:
    """Synchronous wrapper around :meth:`receive` for WSGI-style frameworks.

    Use this from Flask, Gunicorn sync workers, ``http.server``, or any
    other sync-only HTTP entry point where wrapping every call in
    ``asyncio.run(...)`` is just noise::

        @app.post("/webhooks/adcp")
        def hook():
            outcome = receiver.receive_sync(
                method=request.method,
                url=request.url,
                headers=dict(request.headers),
                body=request.get_data(),
            )
            ...

    Raises :class:`RuntimeError` if invoked from a thread that already has
    a running event loop — the underlying verify / dedup path is async and
    cannot be driven from inside an active loop without blocking it. From
    async code, call :meth:`receive` directly.
    """
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        # No running loop in this thread — safe to spin one up.
        return asyncio.run(self.receive(method=method, url=url, headers=headers, body=body))
    raise RuntimeError(
        "WebhookReceiver.receive_sync() cannot be called from a running "
        "event loop. Use `await receiver.receive(...)` instead."
    )

Synchronous wrapper around :meth:receive for WSGI-style frameworks.

Use this from Flask, Gunicorn sync workers, http.server, or any other sync-only HTTP entry point where wrapping every call in asyncio.run(…) is just noise::

@app.post("/webhooks/adcp")
def hook():
    outcome = receiver.receive_sync(
        method=request.method,
        url=request.url,
        headers=dict(request.headers),
        body=request.get_data(),
    )
    ...

Raises :class:RuntimeError if invoked from a thread that already has a running event loop — the underlying verify / dedup path is async and cannot be driven from inside an active loop without blocking it. From async code, call :meth:receive directly.

class WebhookReceiverConfig (verify_options: WebhookVerifyOptions,
dedup: WebhookDedupStore,
legacy_hmac: LegacyHmacFallback | None = None,
kind: WebhookKind = 'mcp')
Expand source code
@dataclass(frozen=True)
class WebhookReceiverConfig:
    """Configuration bundle.

    :param verify_options: verifier configuration (JWKS, replay store, etc.).
        A single instance is reused for every request — the verifier stamps
        ``now`` itself via ``verify_options.clock()``, so there's no need to
        refresh a time field per request.
    :param dedup: webhook-dedup store.
    :param legacy_hmac: optional HMAC-SHA256 fallback for 3.x migration.
    :param kind: which webhook payload type to parse into. Default ``"mcp"``
        (the task-status webhook that dominates most integrations); pass
        explicitly for list-change / artifact / revocation receivers.
    """

    verify_options: WebhookVerifyOptions
    dedup: WebhookDedupStore
    legacy_hmac: LegacyHmacFallback | None = None
    kind: WebhookKind = "mcp"

Configuration bundle.

:param verify_options: verifier configuration (JWKS, replay store, etc.). A single instance is reused for every request — the verifier stamps now itself via verify_options.clock(), so there's no need to refresh a time field per request. :param dedup: webhook-dedup store. :param legacy_hmac: optional HMAC-SHA256 fallback for 3.x migration. :param kind: which webhook payload type to parse into. Default "mcp" (the task-status webhook that dominates most integrations); pass explicitly for list-change / artifact / revocation receivers.

Instance variables

var dedupWebhookDedupStore
var kind : Literal['mcp', 'revocation_notification', 'collection_list_changed', 'property_list_changed', 'artifact']
var legacy_hmacLegacyHmacFallback | None
var verify_optionsWebhookVerifyOptions