Module adcp.server.idempotency

Server-side idempotency middleware for AdCP mutating tool handlers.

Implements the seller side of AdCP #2315: extract idempotency_key, look up cached responses scoped by (authenticated_principal, idempotency_key), and replay the cached response verbatim when a subsequent request carries the same key + canonicalized-equivalent payload. Reject key reuse with a different payload as IDEMPOTENCY_CONFLICT.

The spec contract lives at adcontextprotocol/adcp/docs/building/implementation/security.mdx#idempotency.

Typical usage::

from adcp.server import ADCPHandler, IdempotencyStore, MemoryBackend, ToolContext
from adcp.server.responses import capabilities_response

idempotency = IdempotencyStore(
    backend=MemoryBackend(),
    ttl_seconds=86400,  # 24 hours, matches spec minimum
)

class MySeller(ADCPHandler):
    @idempotency.wrap
    async def create_media_buy(self, params, context=None):
        # <code>params</code> carries <code>idempotency\_key</code>; <code>context.caller\_identity</code>
        # identifies the principal. Without either, the middleware falls
        # through to this handler with no dedup (schema validation above
        # us rejects missing keys per AdCP #2315).
        return my_create_logic(params)

    async def get_adcp_capabilities(self, params, context=None):
        return capabilities_response(
            ["media_buy"],
            idempotency=idempotency.capability(),
        )

Callers who invoke the handler directly (tests, non-HTTP code paths) must pass a :class:ToolContext with caller_identity set — it's how the middleware scopes the cache namespace per-principal::

ctx = ToolContext(caller_identity="buyer-acme")
result = await seller.create_media_buy(
    {"idempotency_key": key, ...}, ctx
)

Backends:

  • :class:MemoryBackend — in-process dict with TTL; use for tests and single-process reference implementations.
  • :class:PgBackend — scaffold for a SQLAlchemy/asyncpg-backed store that can commit cache writes atomically with business writes. Implementation arrives in a follow-up PR.

Sub-modules

adcp.server.idempotency.backends

Storage backends for :class:~adcp.server.idempotency.IdempotencyStore

adcp.server.idempotency.canonicalize

Canonical payload hashing for AdCP idempotency replay detection …

adcp.server.idempotency.store

The :class:IdempotencyStore coordinator: canonical hashing + backend + decorator …

adcp.server.idempotency.webhook_dedup

Webhook receiver-side dedup store …

Functions

def canonical_json_sha256(payload: dict[str, Any]) ‑> str
Expand source code
def canonical_json_sha256(payload: dict[str, Any]) -> str:
    """Compute the spec's canonical payload fingerprint.

    1. Strip the spec's exclusion list (see :func:`strip_excluded_fields`).
    2. RFC 8785 JCS canonicalize (stable key order, compact, UTF-8, spec-
       compliant number serialization).
    3. SHA-256 over the canonical bytes; return hex digest.

    The result is stable across all conforming JCS implementations. Two
    payloads whose hashes match are equivalent under AdCP replay semantics;
    two with different hashes are distinct and MUST be treated as a conflict
    when the caller supplies the same ``idempotency_key``.
    """
    stripped = strip_excluded_fields(payload)
    canonical = rfc8785.dumps(stripped)
    return hashlib.sha256(canonical).hexdigest()

Compute the spec's canonical payload fingerprint.

  1. Strip the spec's exclusion list (see :func:strip_excluded_fields()).
  2. RFC 8785 JCS canonicalize (stable key order, compact, UTF-8, spec- compliant number serialization).
  3. SHA-256 over the canonical bytes; return hex digest.

The result is stable across all conforming JCS implementations. Two payloads whose hashes match are equivalent under AdCP replay semantics; two with different hashes are distinct and MUST be treated as a conflict when the caller supplies the same idempotency_key.

def strip_excluded_fields(payload: dict[str, Any]) ‑> dict[str, typing.Any]
Expand source code
def strip_excluded_fields(payload: dict[str, Any]) -> dict[str, Any]:
    """Return a deep copy of ``payload`` with the spec's exclusion list removed.

    Top-level keys in :data:`EXCLUDED_FIELDS` are dropped. Nested paths in
    ``_NESTED_EXCLUSIONS`` are traversed; the final leaf key is removed if the
    traversal reaches it. Missing intermediate keys are a no-op — the caller's
    payload is free to omit the push_notification_config entirely.

    The input dict is never mutated.
    """
    out: dict[str, Any] = copy.deepcopy(payload)
    for key in EXCLUDED_FIELDS:
        out.pop(key, None)
    for path in _NESTED_EXCLUSIONS:
        _drop_nested(out, path)
    return out

Return a deep copy of payload with the spec's exclusion list removed.

Top-level keys in :data:EXCLUDED_FIELDS are dropped. Nested paths in _NESTED_EXCLUSIONS are traversed; the final leaf key is removed if the traversal reaches it. Missing intermediate keys are a no-op — the caller's payload is free to omit the push_notification_config entirely.

The input dict is never mutated.

Classes

class CachedResponse (payload_hash: str, response: dict[str, Any], expires_at_epoch: float)
Expand source code
@dataclass(frozen=True)
class CachedResponse:
    """A single cached handler response keyed by ``(principal_id, key)``.

    :param payload_hash: Canonical JSON SHA-256 of the *original* request. On
        replay we compare the new request's hash to this value; mismatch is
        ``IDEMPOTENCY_CONFLICT``.
    :param response: The response dict the handler returned. Returned verbatim
        on replay — the seller injects ``replayed: true`` at the envelope
        level before sending.
    :param expires_at_epoch: Unix timestamp (seconds) when this entry becomes
        eligible for eviction. Reads after this time return None.
    """

    payload_hash: str
    response: dict[str, Any]
    expires_at_epoch: float

A single cached handler response keyed by (principal_id, key).

:param payload_hash: Canonical JSON SHA-256 of the original request. On replay we compare the new request's hash to this value; mismatch is IDEMPOTENCY_CONFLICT. :param response: The response dict the handler returned. Returned verbatim on replay — the seller injects replayed: true at the envelope level before sending. :param expires_at_epoch: Unix timestamp (seconds) when this entry becomes eligible for eviction. Reads after this time return None.

Instance variables

var expires_at_epoch : float
var payload_hash : str
var response : dict[str, typing.Any]
class IdempotencyBackend
Expand source code
class IdempotencyBackend(ABC):
    """Abstract storage backend contract.

    All methods are async. Implementations MUST be safe to call concurrently
    from multiple asyncio tasks — :class:`IdempotencyStore` does not serialize
    access on the caller's behalf.
    """

    @abstractmethod
    async def get(
        self, principal_id: str, key: str
    ) -> CachedResponse | None:
        """Return the cached entry, or None if missing or expired."""

    @abstractmethod
    async def put(
        self,
        principal_id: str,
        key: str,
        entry: CachedResponse,
    ) -> None:
        """Store ``entry`` under ``(principal_id, key)``. Overwrites any prior
        entry — the store only calls ``put`` after verifying the slot is empty
        or expired, so an overwrite in that window is a legitimate retry of
        the write itself."""

    @abstractmethod
    async def delete_expired(self, now_epoch: float | None = None) -> int:
        """Best-effort sweep of expired entries. Returns the count removed.

        Sweeping is optional — :meth:`get` MUST self-filter expired entries.
        Backends that have natural TTL primitives (Redis ``EXPIRE``, Postgres
        partial indexes) may implement this as a no-op."""

Abstract storage backend contract.

All methods are async. Implementations MUST be safe to call concurrently from multiple asyncio tasks — :class:IdempotencyStore does not serialize access on the caller's behalf.

Ancestors

  • abc.ABC

Subclasses

Methods

async def delete_expired(self, now_epoch: float | None = None) ‑> int
Expand source code
@abstractmethod
async def delete_expired(self, now_epoch: float | None = None) -> int:
    """Best-effort sweep of expired entries. Returns the count removed.

    Sweeping is optional — :meth:`get` MUST self-filter expired entries.
    Backends that have natural TTL primitives (Redis ``EXPIRE``, Postgres
    partial indexes) may implement this as a no-op."""

Best-effort sweep of expired entries. Returns the count removed.

Sweeping is optional — :meth:get MUST self-filter expired entries. Backends that have natural TTL primitives (Redis EXPIRE, Postgres partial indexes) may implement this as a no-op.

async def get(self, principal_id: str, key: str) ‑> CachedResponse | None
Expand source code
@abstractmethod
async def get(
    self, principal_id: str, key: str
) -> CachedResponse | None:
    """Return the cached entry, or None if missing or expired."""

Return the cached entry, or None if missing or expired.

async def put(self,
principal_id: str,
key: str,
entry: CachedResponse) ‑> None
Expand source code
@abstractmethod
async def put(
    self,
    principal_id: str,
    key: str,
    entry: CachedResponse,
) -> None:
    """Store ``entry`` under ``(principal_id, key)``. Overwrites any prior
    entry — the store only calls ``put`` after verifying the slot is empty
    or expired, so an overwrite in that window is a legitimate retry of
    the write itself."""

Store entry under (principal_id, key). Overwrites any prior entry — the store only calls put after verifying the slot is empty or expired, so an overwrite in that window is a legitimate retry of the write itself.

class IdempotencyStore (backend: IdempotencyBackend,
ttl_seconds: int = 86400,
hash_fn: Callable[[dict[str, Any]], str] = <function canonical_json_sha256>,
*,
clock: Callable[[], float] = <built-in function time>)
Expand source code
class IdempotencyStore:
    """Coordinator that binds canonical hashing to a storage backend.

    :param backend: A concrete :class:`IdempotencyBackend`.
    :param ttl_seconds: How long cached responses remain replayable. Must be
        within the spec's ``[3600, 604800]`` range (1h to 7d). 86400 (24h) is
        the recommended floor and matches the compliance storyboard.
    :param hash_fn: Optional override for the canonical hash function. Defaults
        to :func:`canonical_json_sha256`. Exposed for tests and for anyone who
        wants to experiment with alternative equivalence rules — though note
        the spec mandates RFC 8785 JCS for interop.
    """

    def __init__(
        self,
        backend: IdempotencyBackend,
        ttl_seconds: int = 86400,
        hash_fn: Callable[[dict[str, Any]], str] = canonical_json_sha256,
        *,
        clock: Callable[[], float] = time.time,
    ) -> None:
        if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS:
            raise ValueError(
                f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] "
                f"per AdCP spec (capabilities.idempotency.replay_ttl_seconds), "
                f"got {ttl_seconds}"
            )
        self.backend = backend
        self.ttl_seconds = ttl_seconds
        self._hash_fn = hash_fn
        self._clock = clock

    def capability(self) -> dict[str, Any]:
        """Return the capabilities fragment declaring this store's replay window.

        Embed under ``capabilities.adcp.idempotency`` on the seller's
        ``get_adcp_capabilities`` response. Buyers read this to reason about
        retry-safe windows (AdCP #2315)::

            caps.adcp.idempotency = idempotency.capability()
            # → {"supported": True, "replay_ttl_seconds": 86400}

        ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only
        ``replay_ttl_seconds`` fail strict schema validation on the new
        capabilities response.
        """
        return {"supported": True, "replay_ttl_seconds": self.ttl_seconds}

    def wrap(self, handler: HandlerFn) -> HandlerFn:
        """Decorator that adds idempotency semantics to an AdCP handler method.

        The wrapped handler is called as ``handler(self, params, context)``.
        ``params`` may be a dict or a Pydantic model — both are normalized to
        a dict before hashing. The return value is coerced to a dict for
        caching (via ``model_dump`` if Pydantic).

        The decorator always returns the handler's original object on a cache
        miss and a best-effort Pydantic re-validation on a hit (when the
        handler's declared return type exposes ``model_validate``). Callers
        that return raw dicts get dicts back.
        """

        @wraps(handler)
        async def _wrapped(
            handler_self: Any,
            params: Any,
            context: Any = None,
            *args: Any,
            **kwargs: Any,
        ) -> Any:
            principal_id, idempotency_key, params_dict = self._prepare(params, context)
            if principal_id is None or idempotency_key is None:
                # No key → spec says the server MUST reject with INVALID_REQUEST.
                # We let the handler run so validation layers above us (Pydantic,
                # FastAPI, etc.) can reject with a typed error; the middleware's
                # job is only to dedup when a key IS present.
                return await handler(handler_self, params, context, *args, **kwargs)

            payload_hash = self._hash_fn(params_dict)

            cached = await self.backend.get(principal_id, idempotency_key)
            if cached is not None:
                if cached.payload_hash == payload_hash:
                    logger.debug(
                        "idempotency replay: principal=%s key_prefix=%s",
                        principal_id,
                        idempotency_key[:8],
                    )
                    return _clone_response(cached.response)
                # Same key, different payload — spec-defined conflict.
                raise IdempotencyConflictError(
                    operation=getattr(handler, "__name__", "handler"),
                    errors=[
                        {
                            "code": "IDEMPOTENCY_CONFLICT",
                            "message": (
                                "idempotency_key reused with a different payload "
                                "(canonical hash mismatch)"
                            ),
                        }
                    ],
                )

            response = await handler(handler_self, params, context, *args, **kwargs)
            # Deep-copy when caching so post-return mutation of the caller's
            # copy can't poison future replays. `_clone_response` also deep-
            # copies on the hit path, giving independent objects per replay.
            response_dict = copy.deepcopy(_to_dict(response))
            entry = CachedResponse(
                payload_hash=payload_hash,
                response=response_dict,
                expires_at_epoch=self._clock() + self.ttl_seconds,
            )
            # Commit cache AFTER handler returns. Atomicity with the handler's
            # side effects depends on the backend: MemoryBackend is best-effort
            # (no transactional relationship to external resources); PgBackend
            # (follow-up) will commit in the same transaction when the handler
            # uses the same engine. On put failure we log loudly and return
            # the handler's response — swallowing the exception would be wrong
            # (operators need the signal that caching is broken), and raising
            # would look to the caller like the handler failed, triggering a
            # retry that re-executes side effects. Best compromise: warn
            # operators, return the result, and accept that the next retry
            # with this key will re-execute.
            try:
                await self.backend.put(principal_id, idempotency_key, entry)
            except Exception:
                logger.warning(
                    "Idempotency cache put failed for principal=%s key_prefix=%s — "
                    "handler completed but a subsequent retry with this key will "
                    "re-execute rather than replay. This indicates an operational "
                    "issue with the idempotency backend.",
                    principal_id,
                    idempotency_key[:8],
                    exc_info=True,
                )
            return response

        return _wrapped

    def _prepare(self, params: Any, context: Any) -> tuple[str | None, str | None, dict[str, Any]]:
        """Normalize inputs and extract the (principal, key, params_dict) tuple.

        Returns ``(None, None, params_dict)`` when idempotency doesn't apply
        (no caller identity or no key supplied). The caller falls through to
        the plain handler in that case — validation of missing-key lives in
        the request schema, not here.
        """
        params_dict = _to_dict(params)
        idempotency_key = params_dict.get("idempotency_key")
        if not isinstance(idempotency_key, str) or not idempotency_key:
            return None, None, params_dict
        principal_id = _extract_principal_id(context)
        if principal_id is None:
            # No caller identity: we can't safely scope the key. Spec requires
            # per-principal scope; anything else is a cross-principal replay
            # attack surface. Fall through to the handler (which will process
            # the request normally — no dedup, but no security regression).
            self._warn_missing_principal_once()
            return None, None, params_dict
        return principal_id, idempotency_key, params_dict

    _missing_principal_warned: bool = False

    def _warn_missing_principal_once(self) -> None:
        """Emit a one-time warning when the middleware sees a key but no principal.

        Silent fall-through is the worst DX: the seller drops in
        ``@idempotency.wrap``, ships, and doesn't discover until incident
        review that no dedup ever happened. Fire once per store instance so
        operators see the signal without filling logs on every request.
        """
        if self._missing_principal_warned:
            return
        self._missing_principal_warned = True
        warnings.warn(
            "IdempotencyStore received a request with idempotency_key but no "
            "caller_identity on ToolContext — dedup is SKIPPED. This usually "
            "means your transport isn't populating the authenticated principal. "
            "A2A: wire an a2a-sdk auth middleware that sets ServerCallContext.user; "
            "MCP: populate ToolContext.caller_identity from your FastMCP auth "
            "middleware (see adcp.server.idempotency README). "
            "This warning fires once per IdempotencyStore instance.",
            UserWarning,
            stacklevel=3,
        )

Coordinator that binds canonical hashing to a storage backend.

:param backend: A concrete :class:IdempotencyBackend. :param ttl_seconds: How long cached responses remain replayable. Must be within the spec's [3600, 604800] range (1h to 7d). 86400 (24h) is the recommended floor and matches the compliance storyboard. :param hash_fn: Optional override for the canonical hash function. Defaults to :func:canonical_json_sha256(). Exposed for tests and for anyone who wants to experiment with alternative equivalence rules — though note the spec mandates RFC 8785 JCS for interop.

Methods

def capability(self) ‑> dict[str, typing.Any]
Expand source code
def capability(self) -> dict[str, Any]:
    """Return the capabilities fragment declaring this store's replay window.

    Embed under ``capabilities.adcp.idempotency`` on the seller's
    ``get_adcp_capabilities`` response. Buyers read this to reason about
    retry-safe windows (AdCP #2315)::

        caps.adcp.idempotency = idempotency.capability()
        # → {"supported": True, "replay_ttl_seconds": 86400}

    ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only
    ``replay_ttl_seconds`` fail strict schema validation on the new
    capabilities response.
    """
    return {"supported": True, "replay_ttl_seconds": self.ttl_seconds}

Return the capabilities fragment declaring this store's replay window.

Embed under capabilities.adcp.idempotency on the seller's get_adcp_capabilities response. Buyers read this to reason about retry-safe windows (AdCP #2315)::

caps.adcp.idempotency = idempotency.capability()
# → {"supported": True, "replay_ttl_seconds": 86400}

supported became REQUIRED in AdCP 3.0 GA — agents emitting only replay_ttl_seconds fail strict schema validation on the new capabilities response.

def wrap(self, handler: HandlerFn) ‑> Callable[..., Awaitable[typing.Any]]
Expand source code
def wrap(self, handler: HandlerFn) -> HandlerFn:
    """Decorator that adds idempotency semantics to an AdCP handler method.

    The wrapped handler is called as ``handler(self, params, context)``.
    ``params`` may be a dict or a Pydantic model — both are normalized to
    a dict before hashing. The return value is coerced to a dict for
    caching (via ``model_dump`` if Pydantic).

    The decorator always returns the handler's original object on a cache
    miss and a best-effort Pydantic re-validation on a hit (when the
    handler's declared return type exposes ``model_validate``). Callers
    that return raw dicts get dicts back.
    """

    @wraps(handler)
    async def _wrapped(
        handler_self: Any,
        params: Any,
        context: Any = None,
        *args: Any,
        **kwargs: Any,
    ) -> Any:
        principal_id, idempotency_key, params_dict = self._prepare(params, context)
        if principal_id is None or idempotency_key is None:
            # No key → spec says the server MUST reject with INVALID_REQUEST.
            # We let the handler run so validation layers above us (Pydantic,
            # FastAPI, etc.) can reject with a typed error; the middleware's
            # job is only to dedup when a key IS present.
            return await handler(handler_self, params, context, *args, **kwargs)

        payload_hash = self._hash_fn(params_dict)

        cached = await self.backend.get(principal_id, idempotency_key)
        if cached is not None:
            if cached.payload_hash == payload_hash:
                logger.debug(
                    "idempotency replay: principal=%s key_prefix=%s",
                    principal_id,
                    idempotency_key[:8],
                )
                return _clone_response(cached.response)
            # Same key, different payload — spec-defined conflict.
            raise IdempotencyConflictError(
                operation=getattr(handler, "__name__", "handler"),
                errors=[
                    {
                        "code": "IDEMPOTENCY_CONFLICT",
                        "message": (
                            "idempotency_key reused with a different payload "
                            "(canonical hash mismatch)"
                        ),
                    }
                ],
            )

        response = await handler(handler_self, params, context, *args, **kwargs)
        # Deep-copy when caching so post-return mutation of the caller's
        # copy can't poison future replays. `_clone_response` also deep-
        # copies on the hit path, giving independent objects per replay.
        response_dict = copy.deepcopy(_to_dict(response))
        entry = CachedResponse(
            payload_hash=payload_hash,
            response=response_dict,
            expires_at_epoch=self._clock() + self.ttl_seconds,
        )
        # Commit cache AFTER handler returns. Atomicity with the handler's
        # side effects depends on the backend: MemoryBackend is best-effort
        # (no transactional relationship to external resources); PgBackend
        # (follow-up) will commit in the same transaction when the handler
        # uses the same engine. On put failure we log loudly and return
        # the handler's response — swallowing the exception would be wrong
        # (operators need the signal that caching is broken), and raising
        # would look to the caller like the handler failed, triggering a
        # retry that re-executes side effects. Best compromise: warn
        # operators, return the result, and accept that the next retry
        # with this key will re-execute.
        try:
            await self.backend.put(principal_id, idempotency_key, entry)
        except Exception:
            logger.warning(
                "Idempotency cache put failed for principal=%s key_prefix=%s — "
                "handler completed but a subsequent retry with this key will "
                "re-execute rather than replay. This indicates an operational "
                "issue with the idempotency backend.",
                principal_id,
                idempotency_key[:8],
                exc_info=True,
            )
        return response

    return _wrapped

Decorator that adds idempotency semantics to an AdCP handler method.

The wrapped handler is called as handler(self, params, context). params may be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (via model_dump if Pydantic).

The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes model_validate). Callers that return raw dicts get dicts back.

class MemoryBackend (*, clock: Callable[[], float] = <built-in function time>)
Expand source code
class MemoryBackend(IdempotencyBackend):
    """In-process dict-backed store.

    Suitable for tests, single-process reference implementations, and local
    development. **Not suitable for multi-process deployments** — each worker
    has its own cache, so a retry that lands on a different worker is treated
    as a fresh request.

    Thread safety: the backend uses an :class:`asyncio.Lock` to serialize
    mutations of the shared dict. Reads go through the lock too; for a pure
    in-process backend this is cheap and prevents torn reads across concurrent
    ``get``/``put`` interleaving.

    :param clock: Callable returning the current epoch seconds. Override for
        tests that need to advance time deterministically without monkeypatching
        :mod:`time`. Defaults to :func:`time.time`.
    """

    def __init__(self, *, clock: Callable[[], float] = time.time) -> None:
        self._store: dict[tuple[str, str], CachedResponse] = {}
        self._lock = asyncio.Lock()
        self._clock = clock

    async def get(
        self, principal_id: str, key: str
    ) -> CachedResponse | None:
        async with self._lock:
            entry = self._store.get((principal_id, key))
            if entry is None:
                return None
            if entry.expires_at_epoch <= self._clock():
                # Lazy expiry — drop the stale entry so the next request
                # treats the slot as fresh and races to repopulate.
                del self._store[(principal_id, key)]
                return None
            return entry

    async def put(
        self,
        principal_id: str,
        key: str,
        entry: CachedResponse,
    ) -> None:
        async with self._lock:
            self._store[(principal_id, key)] = entry

    async def delete_expired(self, now_epoch: float | None = None) -> int:
        cutoff = now_epoch if now_epoch is not None else self._clock()
        async with self._lock:
            stale = [k for k, v in self._store.items() if v.expires_at_epoch <= cutoff]
            for k in stale:
                del self._store[k]
            return len(stale)

    async def clear(self) -> None:
        """Remove all cached entries.

        Test-suite hook — handy for resetting state between fixtures when a
        single :class:`MemoryBackend` is shared across multiple tests.
        """
        async with self._lock:
            self._store.clear()

    async def _size(self) -> int:
        """Test-only: return the current entry count."""
        async with self._lock:
            return len(self._store)

In-process dict-backed store.

Suitable for tests, single-process reference implementations, and local development. Not suitable for multi-process deployments — each worker has its own cache, so a retry that lands on a different worker is treated as a fresh request.

Thread safety: the backend uses an :class:asyncio.Lock to serialize mutations of the shared dict. Reads go through the lock too; for a pure in-process backend this is cheap and prevents torn reads across concurrent get/put interleaving.

:param clock: Callable returning the current epoch seconds. Override for tests that need to advance time deterministically without monkeypatching :mod:time. Defaults to :func:time.time.

Ancestors

Methods

async def clear(self) ‑> None
Expand source code
async def clear(self) -> None:
    """Remove all cached entries.

    Test-suite hook — handy for resetting state between fixtures when a
    single :class:`MemoryBackend` is shared across multiple tests.
    """
    async with self._lock:
        self._store.clear()

Remove all cached entries.

Test-suite hook — handy for resetting state between fixtures when a single :class:MemoryBackend is shared across multiple tests.

Inherited members

class PgBackend (*args: Any, **kwargs: Any)
Expand source code
class PgBackend(IdempotencyBackend):
    """PostgreSQL-backed store — **scaffold, not yet implemented**.

    .. warning::
       Calling ``PgBackend(...)`` raises ``NotImplementedError`` today. Use
       :class:`MemoryBackend` for tests, or implement your own
       :class:`IdempotencyBackend` subclass against the database of your
       choice until this implementation lands. Tracked at
       https://github.com/adcontextprotocol/adcp-client-python/issues/182.

    **Design intent.** Share a transaction with the handler's business
    writes so the cache entry commits atomically with side effects. Without
    that, a crash between ``handler success`` and ``cache commit`` causes
    the retry to re-execute the handler, duplicating side effects.

    **Schema sketch for the implementer.**

    .. code-block:: sql

        CREATE TABLE adcp_idempotency (
            principal_id TEXT       COLLATE "C" NOT NULL,
            key          TEXT       COLLATE "C" NOT NULL,
            payload_hash TEXT       NOT NULL,
            response     JSONB      NOT NULL,
            expires_at   TIMESTAMPTZ NOT NULL,
            PRIMARY KEY (principal_id, key)
        );

    Notes:

    * ``COLLATE "C"`` (or ``CITEXT`` with a deliberate case policy) — avoid
      the default locale collation on the identifier columns. On some
      locales ``Principal-A`` and ``principal-a`` compare equal, which
      would collapse distinct tenants into the same cache slot.
    * Queries MUST filter on ``principal_id`` in the ``WHERE`` clause even
      with the composite PK — row-level security (RLS) enforced via a
      policy like ``USING (principal_id = current_setting('adcp.principal_id')::text)``
      gives belt-and-suspenders protection against accidental cross-tenant
      reads in future handlers.
    * ``get`` uses ``SELECT ... WHERE expires_at > now()``.
    * ``put`` uses ``INSERT ... ON CONFLICT (principal_id, key) DO UPDATE``.
    * Accept a SQLAlchemy/asyncpg session factory so the caller can thread
      the handler's transaction through for atomic commit — the atomicity
      guarantee is the whole reason to use a SQL backend.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        raise NotImplementedError(
            "PgBackend is scaffolded but not yet implemented. Use MemoryBackend "
            "for tests, or implement your own IdempotencyBackend subclass "
            "against your database of choice until the PgBackend implementation "
            "lands. Tracking: "
            "https://github.com/adcontextprotocol/adcp-client-python/issues/182."
        )

    async def get(
        self, principal_id: str, key: str
    ) -> CachedResponse | None:  # pragma: no cover
        raise NotImplementedError

    async def put(
        self,
        principal_id: str,
        key: str,
        entry: CachedResponse,
    ) -> None:  # pragma: no cover
        raise NotImplementedError

    async def delete_expired(
        self, now_epoch: float | None = None
    ) -> int:  # pragma: no cover
        raise NotImplementedError

PostgreSQL-backed store — scaffold, not yet implemented.

Warning

Calling PgBackend(…) raises NotImplementedError today. Use :class:MemoryBackend for tests, or implement your own :class:IdempotencyBackend subclass against the database of your choice until this implementation lands. Tracked at https://github.com/adcontextprotocol/adcp-client-python/issues/182.

Design intent. Share a transaction with the handler's business writes so the cache entry commits atomically with side effects. Without that, a crash between handler success and cache commit causes the retry to re-execute the handler, duplicating side effects.

Schema sketch for the implementer.

.. code-block:: sql

CREATE TABLE adcp_idempotency (
    principal_id TEXT       COLLATE "C" NOT NULL,
    key          TEXT       COLLATE "C" NOT NULL,
    payload_hash TEXT       NOT NULL,
    response     JSONB      NOT NULL,
    expires_at   TIMESTAMPTZ NOT NULL,
    PRIMARY KEY (principal_id, key)
);

Notes:

  • COLLATE "C" (or CITEXT with a deliberate case policy) — avoid the default locale collation on the identifier columns. On some locales Principal-A and principal-a compare equal, which would collapse distinct tenants into the same cache slot.
  • Queries MUST filter on principal_id in the WHERE clause even with the composite PK — row-level security (RLS) enforced via a policy like USING (principal_id = current_setting('adcp.principal_id')::text) gives belt-and-suspenders protection against accidental cross-tenant reads in future handlers.
  • get uses SELECT ... WHERE expires_at > now().
  • put uses INSERT … ON CONFLICT (principal_id, key) DO UPDATE.
  • Accept a SQLAlchemy/asyncpg session factory so the caller can thread the handler's transaction through for atomic commit — the atomicity guarantee is the whole reason to use a SQL backend.

Ancestors

Inherited members

class WebhookDedupStore (backend: IdempotencyBackend,
ttl_seconds: int = 86400,
*,
namespace: str = 'webhook',
clock: Callable[[], float] = <built-in function time>)
Expand source code
class WebhookDedupStore:
    """Dedup ``(sender_id, idempotency_key)`` pairs to suppress retried webhooks.

    :param backend: any :class:`IdempotencyBackend`. Same MemoryBackend or
        PgBackend type used by :class:`IdempotencyStore` is fine — the
        ``namespace`` parameter prefixes all sender IDs so request-side and
        webhook-side scopes can't alias even when sharing one backend instance.
    :param ttl_seconds: replay window. Must be within ``[86400, 604800]`` per
        the spec minimum. Defaults to 86400 (24h).
    :param namespace: prefix applied to every ``sender_id`` before it hits
        the backend. Defaults to ``"webhook"``, which is safe when the same
        backend is shared with :class:`IdempotencyStore` (request-side keys
        are scoped by a principal_id that isn't wrapped in this namespace,
        so collisions are impossible). Override only if you run multiple
        webhook scopes against one backend (e.g., separate dedup spaces for
        task webhooks vs list-change webhooks).
    """

    def __init__(
        self,
        backend: IdempotencyBackend,
        ttl_seconds: int = _MIN_TTL_SECONDS,
        *,
        namespace: str = "webhook",
        clock: Callable[[], float] = time.time,
    ) -> None:
        if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS:
            raise ValueError(
                f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] "
                f"per webhook spec minimum, got {ttl_seconds}"
            )
        if not namespace:
            raise ValueError("namespace must be a non-empty string")
        self.backend = backend
        self.ttl_seconds = ttl_seconds
        self.namespace = namespace
        self._clock = clock

    async def check_and_record(self, sender_id: str, idempotency_key: str) -> bool:
        """Atomically check for first-seen and record if new.

        Returns ``True`` when the pair is first-seen (event should be
        processed), ``False`` on duplicate (caller MUST still return 2xx to
        the sender — the event was delivered successfully, it's just a retry).

        Race note: the check-then-put pattern is not atomic across concurrent
        callers unless the backend provides its own atomicity. MemoryBackend
        serializes individual ``get`` and ``put`` under an ``asyncio.Lock`` but
        does NOT bracket them together — two concurrent retries of the same
        event CAN both observe "first-seen" and both process the event. That's
        a tolerable failure mode: the ultimate guarantee is "at most once per
        replay window in the common case"; a concurrent retry arriving in the
        same few milliseconds is rare and, if it happens, produces the same
        "duplicated side effect" outcome the at-least-once contract already
        warns callers to tolerate. PgBackend implementations SHOULD use
        ``INSERT ... ON CONFLICT DO NOTHING`` returning ``rowcount`` for
        lock-free atomicity.
        """
        if not sender_id:
            raise ValueError("sender_id must be a non-empty string")
        if not idempotency_key:
            raise ValueError("idempotency_key must be a non-empty string")

        scoped_sender = f"{self.namespace}:{sender_id}"
        existing = await self.backend.get(scoped_sender, idempotency_key)
        if existing is not None:
            logger.debug(
                "webhook dedup: duplicate sender=%s key_prefix=%s",
                sender_id,
                idempotency_key[:8],
            )
            return False

        entry = CachedResponse(
            payload_hash=_SENTINEL_HASH,
            response={},
            expires_at_epoch=self._clock() + self.ttl_seconds,
        )
        try:
            await self.backend.put(scoped_sender, idempotency_key, entry)
        except Exception:
            # Same fail-open reasoning as the request-side store: log and
            # process. Swallowing the put failure means this event MIGHT
            # reprocess on retry, not that we drop it. Better than raising,
            # which would look like handler failure to the sender.
            logger.warning(
                "webhook dedup put failed for sender=%s key_prefix=%s — "
                "event processed but next retry will reprocess",
                sender_id,
                idempotency_key[:8],
                exc_info=True,
            )
        return True

Dedup (sender_id, idempotency_key) pairs to suppress retried webhooks.

:param backend: any :class:IdempotencyBackend. Same MemoryBackend or PgBackend type used by :class:IdempotencyStore is fine — the namespace parameter prefixes all sender IDs so request-side and webhook-side scopes can't alias even when sharing one backend instance. :param ttl_seconds: replay window. Must be within [86400, 604800] per the spec minimum. Defaults to 86400 (24h). :param namespace: prefix applied to every sender_id before it hits the backend. Defaults to "webhook", which is safe when the same backend is shared with :class:IdempotencyStore (request-side keys are scoped by a principal_id that isn't wrapped in this namespace, so collisions are impossible). Override only if you run multiple webhook scopes against one backend (e.g., separate dedup spaces for task webhooks vs list-change webhooks).

Methods

async def check_and_record(self, sender_id: str, idempotency_key: str) ‑> bool
Expand source code
async def check_and_record(self, sender_id: str, idempotency_key: str) -> bool:
    """Atomically check for first-seen and record if new.

    Returns ``True`` when the pair is first-seen (event should be
    processed), ``False`` on duplicate (caller MUST still return 2xx to
    the sender — the event was delivered successfully, it's just a retry).

    Race note: the check-then-put pattern is not atomic across concurrent
    callers unless the backend provides its own atomicity. MemoryBackend
    serializes individual ``get`` and ``put`` under an ``asyncio.Lock`` but
    does NOT bracket them together — two concurrent retries of the same
    event CAN both observe "first-seen" and both process the event. That's
    a tolerable failure mode: the ultimate guarantee is "at most once per
    replay window in the common case"; a concurrent retry arriving in the
    same few milliseconds is rare and, if it happens, produces the same
    "duplicated side effect" outcome the at-least-once contract already
    warns callers to tolerate. PgBackend implementations SHOULD use
    ``INSERT ... ON CONFLICT DO NOTHING`` returning ``rowcount`` for
    lock-free atomicity.
    """
    if not sender_id:
        raise ValueError("sender_id must be a non-empty string")
    if not idempotency_key:
        raise ValueError("idempotency_key must be a non-empty string")

    scoped_sender = f"{self.namespace}:{sender_id}"
    existing = await self.backend.get(scoped_sender, idempotency_key)
    if existing is not None:
        logger.debug(
            "webhook dedup: duplicate sender=%s key_prefix=%s",
            sender_id,
            idempotency_key[:8],
        )
        return False

    entry = CachedResponse(
        payload_hash=_SENTINEL_HASH,
        response={},
        expires_at_epoch=self._clock() + self.ttl_seconds,
    )
    try:
        await self.backend.put(scoped_sender, idempotency_key, entry)
    except Exception:
        # Same fail-open reasoning as the request-side store: log and
        # process. Swallowing the put failure means this event MIGHT
        # reprocess on retry, not that we drop it. Better than raising,
        # which would look like handler failure to the sender.
        logger.warning(
            "webhook dedup put failed for sender=%s key_prefix=%s — "
            "event processed but next retry will reprocess",
            sender_id,
            idempotency_key[:8],
            exc_info=True,
        )
    return True

Atomically check for first-seen and record if new.

Returns True when the pair is first-seen (event should be processed), False on duplicate (caller MUST still return 2xx to the sender — the event was delivered successfully, it's just a retry).

Race note: the check-then-put pattern is not atomic across concurrent callers unless the backend provides its own atomicity. MemoryBackend serializes individual get and put under an asyncio.Lock but does NOT bracket them together — two concurrent retries of the same event CAN both observe "first-seen" and both process the event. That's a tolerable failure mode: the ultimate guarantee is "at most once per replay window in the common case"; a concurrent retry arriving in the same few milliseconds is rare and, if it happens, produces the same "duplicated side effect" outcome the at-least-once contract already warns callers to tolerate. PgBackend implementations SHOULD use INSERT … ON CONFLICT DO NOTHING returning rowcount for lock-free atomicity.