Module adcp.server.idempotency
Server-side idempotency middleware for AdCP mutating tool handlers.
Implements the seller side of AdCP #2315: extract idempotency_key, look up
cached responses scoped by (authenticated_principal, idempotency_key), and
replay the cached response verbatim when a subsequent request carries the same
key + canonicalized-equivalent payload. Reject key reuse with a different
payload as IDEMPOTENCY_CONFLICT.
The spec contract lives at
adcontextprotocol/adcp/docs/building/implementation/security.mdx#idempotency.
Typical usage::
from adcp.server import ADCPHandler, IdempotencyStore, MemoryBackend, ToolContext
from adcp.server.responses import capabilities_response
idempotency = IdempotencyStore(
backend=MemoryBackend(),
ttl_seconds=86400, # 24 hours, matches spec minimum
)
class MySeller(ADCPHandler):
@idempotency.wrap
async def create_media_buy(self, params, context=None):
# <code>params</code> carries <code>idempotency\_key</code>; <code>context.caller\_identity</code>
# identifies the principal. Without either, the middleware falls
# through to this handler with no dedup (schema validation above
# us rejects missing keys per AdCP #2315).
return my_create_logic(params)
async def get_adcp_capabilities(self, params, context=None):
return capabilities_response(
["media_buy"],
idempotency=idempotency.capability(),
)
Callers who invoke the handler directly (tests, non-HTTP code paths) must
pass a :class:ToolContext with caller_identity set —
it's how the middleware scopes the cache namespace per-principal::
ctx = ToolContext(caller_identity="buyer-acme")
result = await seller.create_media_buy(
{"idempotency_key": key, ...}, ctx
)
Backends:
- :class:
MemoryBackend— in-process dict with TTL; use for tests and single-process reference implementations. - :class:
PgBackend— scaffold for a SQLAlchemy/asyncpg-backed store that can commit cache writes atomically with business writes. Implementation arrives in a follow-up PR.
Sub-modules
adcp.server.idempotency.backends-
Storage backends for :class:
~adcp.server.idempotency.IdempotencyStore… adcp.server.idempotency.canonicalize-
Canonical payload hashing for AdCP idempotency replay detection …
adcp.server.idempotency.store-
The :class:
IdempotencyStorecoordinator: canonical hashing + backend + decorator … adcp.server.idempotency.webhook_dedup-
Webhook receiver-side dedup store …
Functions
def canonical_json_sha256(payload: dict[str, Any]) ‑> str-
Expand source code
def canonical_json_sha256(payload: dict[str, Any]) -> str: """Compute the spec's canonical payload fingerprint. 1. Strip the spec's exclusion list (see :func:`strip_excluded_fields`). 2. RFC 8785 JCS canonicalize (stable key order, compact, UTF-8, spec- compliant number serialization). 3. SHA-256 over the canonical bytes; return hex digest. The result is stable across all conforming JCS implementations. Two payloads whose hashes match are equivalent under AdCP replay semantics; two with different hashes are distinct and MUST be treated as a conflict when the caller supplies the same ``idempotency_key``. """ stripped = strip_excluded_fields(payload) canonical = rfc8785.dumps(stripped) return hashlib.sha256(canonical).hexdigest()Compute the spec's canonical payload fingerprint.
- Strip the spec's exclusion list (see :func:
strip_excluded_fields()). - RFC 8785 JCS canonicalize (stable key order, compact, UTF-8, spec- compliant number serialization).
- SHA-256 over the canonical bytes; return hex digest.
The result is stable across all conforming JCS implementations. Two payloads whose hashes match are equivalent under AdCP replay semantics; two with different hashes are distinct and MUST be treated as a conflict when the caller supplies the same
idempotency_key. - Strip the spec's exclusion list (see :func:
def strip_excluded_fields(payload: dict[str, Any]) ‑> dict[str, typing.Any]-
Expand source code
def strip_excluded_fields(payload: dict[str, Any]) -> dict[str, Any]: """Return a deep copy of ``payload`` with the spec's exclusion list removed. Top-level keys in :data:`EXCLUDED_FIELDS` are dropped. Nested paths in ``_NESTED_EXCLUSIONS`` are traversed; the final leaf key is removed if the traversal reaches it. Missing intermediate keys are a no-op — the caller's payload is free to omit the push_notification_config entirely. The input dict is never mutated. """ out: dict[str, Any] = copy.deepcopy(payload) for key in EXCLUDED_FIELDS: out.pop(key, None) for path in _NESTED_EXCLUSIONS: _drop_nested(out, path) return outReturn a deep copy of
payloadwith the spec's exclusion list removed.Top-level keys in :data:
EXCLUDED_FIELDSare dropped. Nested paths in_NESTED_EXCLUSIONSare traversed; the final leaf key is removed if the traversal reaches it. Missing intermediate keys are a no-op — the caller's payload is free to omit the push_notification_config entirely.The input dict is never mutated.
Classes
class CachedResponse (payload_hash: str, response: dict[str, Any], expires_at_epoch: float)-
Expand source code
@dataclass(frozen=True) class CachedResponse: """A single cached handler response keyed by ``(principal_id, key)``. :param payload_hash: Canonical JSON SHA-256 of the *original* request. On replay we compare the new request's hash to this value; mismatch is ``IDEMPOTENCY_CONFLICT``. :param response: The response dict the handler returned. Returned verbatim on replay — the seller injects ``replayed: true`` at the envelope level before sending. :param expires_at_epoch: Unix timestamp (seconds) when this entry becomes eligible for eviction. Reads after this time return None. """ payload_hash: str response: dict[str, Any] expires_at_epoch: floatA single cached handler response keyed by
(principal_id, key).:param payload_hash: Canonical JSON SHA-256 of the original request. On replay we compare the new request's hash to this value; mismatch is
IDEMPOTENCY_CONFLICT. :param response: The response dict the handler returned. Returned verbatim on replay — the seller injectsreplayed: trueat the envelope level before sending. :param expires_at_epoch: Unix timestamp (seconds) when this entry becomes eligible for eviction. Reads after this time return None.Instance variables
var expires_at_epoch : floatvar payload_hash : strvar response : dict[str, typing.Any]
class IdempotencyBackend-
Expand source code
class IdempotencyBackend(ABC): """Abstract storage backend contract. All methods are async. Implementations MUST be safe to call concurrently from multiple asyncio tasks — :class:`IdempotencyStore` does not serialize access on the caller's behalf. """ @abstractmethod async def get( self, principal_id: str, key: str ) -> CachedResponse | None: """Return the cached entry, or None if missing or expired.""" @abstractmethod async def put( self, principal_id: str, key: str, entry: CachedResponse, ) -> None: """Store ``entry`` under ``(principal_id, key)``. Overwrites any prior entry — the store only calls ``put`` after verifying the slot is empty or expired, so an overwrite in that window is a legitimate retry of the write itself.""" @abstractmethod async def delete_expired(self, now_epoch: float | None = None) -> int: """Best-effort sweep of expired entries. Returns the count removed. Sweeping is optional — :meth:`get` MUST self-filter expired entries. Backends that have natural TTL primitives (Redis ``EXPIRE``, Postgres partial indexes) may implement this as a no-op."""Abstract storage backend contract.
All methods are async. Implementations MUST be safe to call concurrently from multiple asyncio tasks — :class:
IdempotencyStoredoes not serialize access on the caller's behalf.Ancestors
- abc.ABC
Subclasses
Methods
async def delete_expired(self, now_epoch: float | None = None) ‑> int-
Expand source code
@abstractmethod async def delete_expired(self, now_epoch: float | None = None) -> int: """Best-effort sweep of expired entries. Returns the count removed. Sweeping is optional — :meth:`get` MUST self-filter expired entries. Backends that have natural TTL primitives (Redis ``EXPIRE``, Postgres partial indexes) may implement this as a no-op."""Best-effort sweep of expired entries. Returns the count removed.
Sweeping is optional — :meth:
getMUST self-filter expired entries. Backends that have natural TTL primitives (RedisEXPIRE, Postgres partial indexes) may implement this as a no-op. async def get(self, principal_id: str, key: str) ‑> CachedResponse | None-
Expand source code
@abstractmethod async def get( self, principal_id: str, key: str ) -> CachedResponse | None: """Return the cached entry, or None if missing or expired."""Return the cached entry, or None if missing or expired.
async def put(self,
principal_id: str,
key: str,
entry: CachedResponse) ‑> None-
Expand source code
@abstractmethod async def put( self, principal_id: str, key: str, entry: CachedResponse, ) -> None: """Store ``entry`` under ``(principal_id, key)``. Overwrites any prior entry — the store only calls ``put`` after verifying the slot is empty or expired, so an overwrite in that window is a legitimate retry of the write itself."""Store
entryunder(principal_id, key). Overwrites any prior entry — the store only callsputafter verifying the slot is empty or expired, so an overwrite in that window is a legitimate retry of the write itself.
class IdempotencyStore (backend: IdempotencyBackend,
ttl_seconds: int = 86400,
hash_fn: Callable[[dict[str, Any]], str] = <function canonical_json_sha256>,
*,
clock: Callable[[], float] = <built-in function time>)-
Expand source code
class IdempotencyStore: """Coordinator that binds canonical hashing to a storage backend. :param backend: A concrete :class:`IdempotencyBackend`. :param ttl_seconds: How long cached responses remain replayable. Must be within the spec's ``[3600, 604800]`` range (1h to 7d). 86400 (24h) is the recommended floor and matches the compliance storyboard. :param hash_fn: Optional override for the canonical hash function. Defaults to :func:`canonical_json_sha256`. Exposed for tests and for anyone who wants to experiment with alternative equivalence rules — though note the spec mandates RFC 8785 JCS for interop. """ def __init__( self, backend: IdempotencyBackend, ttl_seconds: int = 86400, hash_fn: Callable[[dict[str, Any]], str] = canonical_json_sha256, *, clock: Callable[[], float] = time.time, ) -> None: if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS: raise ValueError( f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] " f"per AdCP spec (capabilities.idempotency.replay_ttl_seconds), " f"got {ttl_seconds}" ) self.backend = backend self.ttl_seconds = ttl_seconds self._hash_fn = hash_fn self._clock = clock def capability(self) -> dict[str, Any]: """Return the capabilities fragment declaring this store's replay window. Embed under ``capabilities.adcp.idempotency`` on the seller's ``get_adcp_capabilities`` response. Buyers read this to reason about retry-safe windows (AdCP #2315):: caps.adcp.idempotency = idempotency.capability() # → {"supported": True, "replay_ttl_seconds": 86400} ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only ``replay_ttl_seconds`` fail strict schema validation on the new capabilities response. """ return {"supported": True, "replay_ttl_seconds": self.ttl_seconds} def wrap(self, handler: HandlerFn) -> HandlerFn: """Decorator that adds idempotency semantics to an AdCP handler method. The wrapped handler is called as ``handler(self, params, context)``. ``params`` may be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (via ``model_dump`` if Pydantic). The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes ``model_validate``). Callers that return raw dicts get dicts back. """ @wraps(handler) async def _wrapped( handler_self: Any, params: Any, context: Any = None, *args: Any, **kwargs: Any, ) -> Any: principal_id, idempotency_key, params_dict = self._prepare(params, context) if principal_id is None or idempotency_key is None: # No key → spec says the server MUST reject with INVALID_REQUEST. # We let the handler run so validation layers above us (Pydantic, # FastAPI, etc.) can reject with a typed error; the middleware's # job is only to dedup when a key IS present. return await handler(handler_self, params, context, *args, **kwargs) payload_hash = self._hash_fn(params_dict) cached = await self.backend.get(principal_id, idempotency_key) if cached is not None: if cached.payload_hash == payload_hash: logger.debug( "idempotency replay: principal=%s key_prefix=%s", principal_id, idempotency_key[:8], ) return _clone_response(cached.response) # Same key, different payload — spec-defined conflict. raise IdempotencyConflictError( operation=getattr(handler, "__name__", "handler"), errors=[ { "code": "IDEMPOTENCY_CONFLICT", "message": ( "idempotency_key reused with a different payload " "(canonical hash mismatch)" ), } ], ) response = await handler(handler_self, params, context, *args, **kwargs) # Deep-copy when caching so post-return mutation of the caller's # copy can't poison future replays. `_clone_response` also deep- # copies on the hit path, giving independent objects per replay. response_dict = copy.deepcopy(_to_dict(response)) entry = CachedResponse( payload_hash=payload_hash, response=response_dict, expires_at_epoch=self._clock() + self.ttl_seconds, ) # Commit cache AFTER handler returns. Atomicity with the handler's # side effects depends on the backend: MemoryBackend is best-effort # (no transactional relationship to external resources); PgBackend # (follow-up) will commit in the same transaction when the handler # uses the same engine. On put failure we log loudly and return # the handler's response — swallowing the exception would be wrong # (operators need the signal that caching is broken), and raising # would look to the caller like the handler failed, triggering a # retry that re-executes side effects. Best compromise: warn # operators, return the result, and accept that the next retry # with this key will re-execute. try: await self.backend.put(principal_id, idempotency_key, entry) except Exception: logger.warning( "Idempotency cache put failed for principal=%s key_prefix=%s — " "handler completed but a subsequent retry with this key will " "re-execute rather than replay. This indicates an operational " "issue with the idempotency backend.", principal_id, idempotency_key[:8], exc_info=True, ) return response return _wrapped def _prepare(self, params: Any, context: Any) -> tuple[str | None, str | None, dict[str, Any]]: """Normalize inputs and extract the (principal, key, params_dict) tuple. Returns ``(None, None, params_dict)`` when idempotency doesn't apply (no caller identity or no key supplied). The caller falls through to the plain handler in that case — validation of missing-key lives in the request schema, not here. """ params_dict = _to_dict(params) idempotency_key = params_dict.get("idempotency_key") if not isinstance(idempotency_key, str) or not idempotency_key: return None, None, params_dict principal_id = _extract_principal_id(context) if principal_id is None: # No caller identity: we can't safely scope the key. Spec requires # per-principal scope; anything else is a cross-principal replay # attack surface. Fall through to the handler (which will process # the request normally — no dedup, but no security regression). self._warn_missing_principal_once() return None, None, params_dict return principal_id, idempotency_key, params_dict _missing_principal_warned: bool = False def _warn_missing_principal_once(self) -> None: """Emit a one-time warning when the middleware sees a key but no principal. Silent fall-through is the worst DX: the seller drops in ``@idempotency.wrap``, ships, and doesn't discover until incident review that no dedup ever happened. Fire once per store instance so operators see the signal without filling logs on every request. """ if self._missing_principal_warned: return self._missing_principal_warned = True warnings.warn( "IdempotencyStore received a request with idempotency_key but no " "caller_identity on ToolContext — dedup is SKIPPED. This usually " "means your transport isn't populating the authenticated principal. " "A2A: wire an a2a-sdk auth middleware that sets ServerCallContext.user; " "MCP: populate ToolContext.caller_identity from your FastMCP auth " "middleware (see adcp.server.idempotency README). " "This warning fires once per IdempotencyStore instance.", UserWarning, stacklevel=3, )Coordinator that binds canonical hashing to a storage backend.
:param backend: A concrete :class:
IdempotencyBackend. :param ttl_seconds: How long cached responses remain replayable. Must be within the spec's[3600, 604800]range (1h to 7d). 86400 (24h) is the recommended floor and matches the compliance storyboard. :param hash_fn: Optional override for the canonical hash function. Defaults to :func:canonical_json_sha256(). Exposed for tests and for anyone who wants to experiment with alternative equivalence rules — though note the spec mandates RFC 8785 JCS for interop.Methods
def capability(self) ‑> dict[str, typing.Any]-
Expand source code
def capability(self) -> dict[str, Any]: """Return the capabilities fragment declaring this store's replay window. Embed under ``capabilities.adcp.idempotency`` on the seller's ``get_adcp_capabilities`` response. Buyers read this to reason about retry-safe windows (AdCP #2315):: caps.adcp.idempotency = idempotency.capability() # → {"supported": True, "replay_ttl_seconds": 86400} ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only ``replay_ttl_seconds`` fail strict schema validation on the new capabilities response. """ return {"supported": True, "replay_ttl_seconds": self.ttl_seconds}Return the capabilities fragment declaring this store's replay window.
Embed under
capabilities.adcp.idempotencyon the seller'sget_adcp_capabilitiesresponse. Buyers read this to reason about retry-safe windows (AdCP #2315)::caps.adcp.idempotency = idempotency.capability() # → {"supported": True, "replay_ttl_seconds": 86400}supportedbecame REQUIRED in AdCP 3.0 GA — agents emitting onlyreplay_ttl_secondsfail strict schema validation on the new capabilities response. def wrap(self, handler: HandlerFn) ‑> Callable[..., Awaitable[typing.Any]]-
Expand source code
def wrap(self, handler: HandlerFn) -> HandlerFn: """Decorator that adds idempotency semantics to an AdCP handler method. The wrapped handler is called as ``handler(self, params, context)``. ``params`` may be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (via ``model_dump`` if Pydantic). The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes ``model_validate``). Callers that return raw dicts get dicts back. """ @wraps(handler) async def _wrapped( handler_self: Any, params: Any, context: Any = None, *args: Any, **kwargs: Any, ) -> Any: principal_id, idempotency_key, params_dict = self._prepare(params, context) if principal_id is None or idempotency_key is None: # No key → spec says the server MUST reject with INVALID_REQUEST. # We let the handler run so validation layers above us (Pydantic, # FastAPI, etc.) can reject with a typed error; the middleware's # job is only to dedup when a key IS present. return await handler(handler_self, params, context, *args, **kwargs) payload_hash = self._hash_fn(params_dict) cached = await self.backend.get(principal_id, idempotency_key) if cached is not None: if cached.payload_hash == payload_hash: logger.debug( "idempotency replay: principal=%s key_prefix=%s", principal_id, idempotency_key[:8], ) return _clone_response(cached.response) # Same key, different payload — spec-defined conflict. raise IdempotencyConflictError( operation=getattr(handler, "__name__", "handler"), errors=[ { "code": "IDEMPOTENCY_CONFLICT", "message": ( "idempotency_key reused with a different payload " "(canonical hash mismatch)" ), } ], ) response = await handler(handler_self, params, context, *args, **kwargs) # Deep-copy when caching so post-return mutation of the caller's # copy can't poison future replays. `_clone_response` also deep- # copies on the hit path, giving independent objects per replay. response_dict = copy.deepcopy(_to_dict(response)) entry = CachedResponse( payload_hash=payload_hash, response=response_dict, expires_at_epoch=self._clock() + self.ttl_seconds, ) # Commit cache AFTER handler returns. Atomicity with the handler's # side effects depends on the backend: MemoryBackend is best-effort # (no transactional relationship to external resources); PgBackend # (follow-up) will commit in the same transaction when the handler # uses the same engine. On put failure we log loudly and return # the handler's response — swallowing the exception would be wrong # (operators need the signal that caching is broken), and raising # would look to the caller like the handler failed, triggering a # retry that re-executes side effects. Best compromise: warn # operators, return the result, and accept that the next retry # with this key will re-execute. try: await self.backend.put(principal_id, idempotency_key, entry) except Exception: logger.warning( "Idempotency cache put failed for principal=%s key_prefix=%s — " "handler completed but a subsequent retry with this key will " "re-execute rather than replay. This indicates an operational " "issue with the idempotency backend.", principal_id, idempotency_key[:8], exc_info=True, ) return response return _wrappedDecorator that adds idempotency semantics to an AdCP handler method.
The wrapped handler is called as
handler(self, params, context).paramsmay be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (viamodel_dumpif Pydantic).The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes
model_validate). Callers that return raw dicts get dicts back.
class MemoryBackend (*, clock: Callable[[], float] = <built-in function time>)-
Expand source code
class MemoryBackend(IdempotencyBackend): """In-process dict-backed store. Suitable for tests, single-process reference implementations, and local development. **Not suitable for multi-process deployments** — each worker has its own cache, so a retry that lands on a different worker is treated as a fresh request. Thread safety: the backend uses an :class:`asyncio.Lock` to serialize mutations of the shared dict. Reads go through the lock too; for a pure in-process backend this is cheap and prevents torn reads across concurrent ``get``/``put`` interleaving. :param clock: Callable returning the current epoch seconds. Override for tests that need to advance time deterministically without monkeypatching :mod:`time`. Defaults to :func:`time.time`. """ def __init__(self, *, clock: Callable[[], float] = time.time) -> None: self._store: dict[tuple[str, str], CachedResponse] = {} self._lock = asyncio.Lock() self._clock = clock async def get( self, principal_id: str, key: str ) -> CachedResponse | None: async with self._lock: entry = self._store.get((principal_id, key)) if entry is None: return None if entry.expires_at_epoch <= self._clock(): # Lazy expiry — drop the stale entry so the next request # treats the slot as fresh and races to repopulate. del self._store[(principal_id, key)] return None return entry async def put( self, principal_id: str, key: str, entry: CachedResponse, ) -> None: async with self._lock: self._store[(principal_id, key)] = entry async def delete_expired(self, now_epoch: float | None = None) -> int: cutoff = now_epoch if now_epoch is not None else self._clock() async with self._lock: stale = [k for k, v in self._store.items() if v.expires_at_epoch <= cutoff] for k in stale: del self._store[k] return len(stale) async def clear(self) -> None: """Remove all cached entries. Test-suite hook — handy for resetting state between fixtures when a single :class:`MemoryBackend` is shared across multiple tests. """ async with self._lock: self._store.clear() async def _size(self) -> int: """Test-only: return the current entry count.""" async with self._lock: return len(self._store)In-process dict-backed store.
Suitable for tests, single-process reference implementations, and local development. Not suitable for multi-process deployments — each worker has its own cache, so a retry that lands on a different worker is treated as a fresh request.
Thread safety: the backend uses an :class:
asyncio.Lockto serialize mutations of the shared dict. Reads go through the lock too; for a pure in-process backend this is cheap and prevents torn reads across concurrentget/putinterleaving.:param clock: Callable returning the current epoch seconds. Override for tests that need to advance time deterministically without monkeypatching :mod:
time. Defaults to :func:time.time.Ancestors
- IdempotencyBackend
- abc.ABC
Methods
async def clear(self) ‑> None-
Expand source code
async def clear(self) -> None: """Remove all cached entries. Test-suite hook — handy for resetting state between fixtures when a single :class:`MemoryBackend` is shared across multiple tests. """ async with self._lock: self._store.clear()Remove all cached entries.
Test-suite hook — handy for resetting state between fixtures when a single :class:
MemoryBackendis shared across multiple tests.
Inherited members
class PgBackend (*args: Any, **kwargs: Any)-
Expand source code
class PgBackend(IdempotencyBackend): """PostgreSQL-backed store — **scaffold, not yet implemented**. .. warning:: Calling ``PgBackend(...)`` raises ``NotImplementedError`` today. Use :class:`MemoryBackend` for tests, or implement your own :class:`IdempotencyBackend` subclass against the database of your choice until this implementation lands. Tracked at https://github.com/adcontextprotocol/adcp-client-python/issues/182. **Design intent.** Share a transaction with the handler's business writes so the cache entry commits atomically with side effects. Without that, a crash between ``handler success`` and ``cache commit`` causes the retry to re-execute the handler, duplicating side effects. **Schema sketch for the implementer.** .. code-block:: sql CREATE TABLE adcp_idempotency ( principal_id TEXT COLLATE "C" NOT NULL, key TEXT COLLATE "C" NOT NULL, payload_hash TEXT NOT NULL, response JSONB NOT NULL, expires_at TIMESTAMPTZ NOT NULL, PRIMARY KEY (principal_id, key) ); Notes: * ``COLLATE "C"`` (or ``CITEXT`` with a deliberate case policy) — avoid the default locale collation on the identifier columns. On some locales ``Principal-A`` and ``principal-a`` compare equal, which would collapse distinct tenants into the same cache slot. * Queries MUST filter on ``principal_id`` in the ``WHERE`` clause even with the composite PK — row-level security (RLS) enforced via a policy like ``USING (principal_id = current_setting('adcp.principal_id')::text)`` gives belt-and-suspenders protection against accidental cross-tenant reads in future handlers. * ``get`` uses ``SELECT ... WHERE expires_at > now()``. * ``put`` uses ``INSERT ... ON CONFLICT (principal_id, key) DO UPDATE``. * Accept a SQLAlchemy/asyncpg session factory so the caller can thread the handler's transaction through for atomic commit — the atomicity guarantee is the whole reason to use a SQL backend. """ def __init__(self, *args: Any, **kwargs: Any) -> None: raise NotImplementedError( "PgBackend is scaffolded but not yet implemented. Use MemoryBackend " "for tests, or implement your own IdempotencyBackend subclass " "against your database of choice until the PgBackend implementation " "lands. Tracking: " "https://github.com/adcontextprotocol/adcp-client-python/issues/182." ) async def get( self, principal_id: str, key: str ) -> CachedResponse | None: # pragma: no cover raise NotImplementedError async def put( self, principal_id: str, key: str, entry: CachedResponse, ) -> None: # pragma: no cover raise NotImplementedError async def delete_expired( self, now_epoch: float | None = None ) -> int: # pragma: no cover raise NotImplementedErrorPostgreSQL-backed store — scaffold, not yet implemented.
Warning
Calling
PgBackend(…)raisesNotImplementedErrortoday. Use :class:MemoryBackendfor tests, or implement your own :class:IdempotencyBackendsubclass against the database of your choice until this implementation lands. Tracked at https://github.com/adcontextprotocol/adcp-client-python/issues/182.Design intent. Share a transaction with the handler's business writes so the cache entry commits atomically with side effects. Without that, a crash between
handler successandcache commitcauses the retry to re-execute the handler, duplicating side effects.Schema sketch for the implementer.
.. code-block:: sql
CREATE TABLE adcp_idempotency ( principal_id TEXT COLLATE "C" NOT NULL, key TEXT COLLATE "C" NOT NULL, payload_hash TEXT NOT NULL, response JSONB NOT NULL, expires_at TIMESTAMPTZ NOT NULL, PRIMARY KEY (principal_id, key) );Notes:
COLLATE "C"(orCITEXTwith a deliberate case policy) — avoid the default locale collation on the identifier columns. On some localesPrincipal-Aandprincipal-acompare equal, which would collapse distinct tenants into the same cache slot.- Queries MUST filter on
principal_idin theWHEREclause even with the composite PK — row-level security (RLS) enforced via a policy likeUSING (principal_id = current_setting('adcp.principal_id')::text)gives belt-and-suspenders protection against accidental cross-tenant reads in future handlers. getusesSELECT ... WHERE expires_at > now().putusesINSERT … ON CONFLICT (principal_id, key) DO UPDATE.- Accept a SQLAlchemy/asyncpg session factory so the caller can thread the handler's transaction through for atomic commit — the atomicity guarantee is the whole reason to use a SQL backend.
Ancestors
- IdempotencyBackend
- abc.ABC
Inherited members
class WebhookDedupStore (backend: IdempotencyBackend,
ttl_seconds: int = 86400,
*,
namespace: str = 'webhook',
clock: Callable[[], float] = <built-in function time>)-
Expand source code
class WebhookDedupStore: """Dedup ``(sender_id, idempotency_key)`` pairs to suppress retried webhooks. :param backend: any :class:`IdempotencyBackend`. Same MemoryBackend or PgBackend type used by :class:`IdempotencyStore` is fine — the ``namespace`` parameter prefixes all sender IDs so request-side and webhook-side scopes can't alias even when sharing one backend instance. :param ttl_seconds: replay window. Must be within ``[86400, 604800]`` per the spec minimum. Defaults to 86400 (24h). :param namespace: prefix applied to every ``sender_id`` before it hits the backend. Defaults to ``"webhook"``, which is safe when the same backend is shared with :class:`IdempotencyStore` (request-side keys are scoped by a principal_id that isn't wrapped in this namespace, so collisions are impossible). Override only if you run multiple webhook scopes against one backend (e.g., separate dedup spaces for task webhooks vs list-change webhooks). """ def __init__( self, backend: IdempotencyBackend, ttl_seconds: int = _MIN_TTL_SECONDS, *, namespace: str = "webhook", clock: Callable[[], float] = time.time, ) -> None: if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS: raise ValueError( f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] " f"per webhook spec minimum, got {ttl_seconds}" ) if not namespace: raise ValueError("namespace must be a non-empty string") self.backend = backend self.ttl_seconds = ttl_seconds self.namespace = namespace self._clock = clock async def check_and_record(self, sender_id: str, idempotency_key: str) -> bool: """Atomically check for first-seen and record if new. Returns ``True`` when the pair is first-seen (event should be processed), ``False`` on duplicate (caller MUST still return 2xx to the sender — the event was delivered successfully, it's just a retry). Race note: the check-then-put pattern is not atomic across concurrent callers unless the backend provides its own atomicity. MemoryBackend serializes individual ``get`` and ``put`` under an ``asyncio.Lock`` but does NOT bracket them together — two concurrent retries of the same event CAN both observe "first-seen" and both process the event. That's a tolerable failure mode: the ultimate guarantee is "at most once per replay window in the common case"; a concurrent retry arriving in the same few milliseconds is rare and, if it happens, produces the same "duplicated side effect" outcome the at-least-once contract already warns callers to tolerate. PgBackend implementations SHOULD use ``INSERT ... ON CONFLICT DO NOTHING`` returning ``rowcount`` for lock-free atomicity. """ if not sender_id: raise ValueError("sender_id must be a non-empty string") if not idempotency_key: raise ValueError("idempotency_key must be a non-empty string") scoped_sender = f"{self.namespace}:{sender_id}" existing = await self.backend.get(scoped_sender, idempotency_key) if existing is not None: logger.debug( "webhook dedup: duplicate sender=%s key_prefix=%s", sender_id, idempotency_key[:8], ) return False entry = CachedResponse( payload_hash=_SENTINEL_HASH, response={}, expires_at_epoch=self._clock() + self.ttl_seconds, ) try: await self.backend.put(scoped_sender, idempotency_key, entry) except Exception: # Same fail-open reasoning as the request-side store: log and # process. Swallowing the put failure means this event MIGHT # reprocess on retry, not that we drop it. Better than raising, # which would look like handler failure to the sender. logger.warning( "webhook dedup put failed for sender=%s key_prefix=%s — " "event processed but next retry will reprocess", sender_id, idempotency_key[:8], exc_info=True, ) return TrueDedup
(sender_id, idempotency_key)pairs to suppress retried webhooks.:param backend: any :class:
IdempotencyBackend. Same MemoryBackend or PgBackend type used by :class:IdempotencyStoreis fine — thenamespaceparameter prefixes all sender IDs so request-side and webhook-side scopes can't alias even when sharing one backend instance. :param ttl_seconds: replay window. Must be within[86400, 604800]per the spec minimum. Defaults to 86400 (24h). :param namespace: prefix applied to everysender_idbefore it hits the backend. Defaults to"webhook", which is safe when the same backend is shared with :class:IdempotencyStore(request-side keys are scoped by a principal_id that isn't wrapped in this namespace, so collisions are impossible). Override only if you run multiple webhook scopes against one backend (e.g., separate dedup spaces for task webhooks vs list-change webhooks).Methods
async def check_and_record(self, sender_id: str, idempotency_key: str) ‑> bool-
Expand source code
async def check_and_record(self, sender_id: str, idempotency_key: str) -> bool: """Atomically check for first-seen and record if new. Returns ``True`` when the pair is first-seen (event should be processed), ``False`` on duplicate (caller MUST still return 2xx to the sender — the event was delivered successfully, it's just a retry). Race note: the check-then-put pattern is not atomic across concurrent callers unless the backend provides its own atomicity. MemoryBackend serializes individual ``get`` and ``put`` under an ``asyncio.Lock`` but does NOT bracket them together — two concurrent retries of the same event CAN both observe "first-seen" and both process the event. That's a tolerable failure mode: the ultimate guarantee is "at most once per replay window in the common case"; a concurrent retry arriving in the same few milliseconds is rare and, if it happens, produces the same "duplicated side effect" outcome the at-least-once contract already warns callers to tolerate. PgBackend implementations SHOULD use ``INSERT ... ON CONFLICT DO NOTHING`` returning ``rowcount`` for lock-free atomicity. """ if not sender_id: raise ValueError("sender_id must be a non-empty string") if not idempotency_key: raise ValueError("idempotency_key must be a non-empty string") scoped_sender = f"{self.namespace}:{sender_id}" existing = await self.backend.get(scoped_sender, idempotency_key) if existing is not None: logger.debug( "webhook dedup: duplicate sender=%s key_prefix=%s", sender_id, idempotency_key[:8], ) return False entry = CachedResponse( payload_hash=_SENTINEL_HASH, response={}, expires_at_epoch=self._clock() + self.ttl_seconds, ) try: await self.backend.put(scoped_sender, idempotency_key, entry) except Exception: # Same fail-open reasoning as the request-side store: log and # process. Swallowing the put failure means this event MIGHT # reprocess on retry, not that we drop it. Better than raising, # which would look like handler failure to the sender. logger.warning( "webhook dedup put failed for sender=%s key_prefix=%s — " "event processed but next retry will reprocess", sender_id, idempotency_key[:8], exc_info=True, ) return TrueAtomically check for first-seen and record if new.
Returns
Truewhen the pair is first-seen (event should be processed),Falseon duplicate (caller MUST still return 2xx to the sender — the event was delivered successfully, it's just a retry).Race note: the check-then-put pattern is not atomic across concurrent callers unless the backend provides its own atomicity. MemoryBackend serializes individual
getandputunder anasyncio.Lockbut does NOT bracket them together — two concurrent retries of the same event CAN both observe "first-seen" and both process the event. That's a tolerable failure mode: the ultimate guarantee is "at most once per replay window in the common case"; a concurrent retry arriving in the same few milliseconds is rare and, if it happens, produces the same "duplicated side effect" outcome the at-least-once contract already warns callers to tolerate. PgBackend implementations SHOULD useINSERT … ON CONFLICT DO NOTHINGreturningrowcountfor lock-free atomicity.