Module adcp.server.idempotency.store
The :class:IdempotencyStore coordinator: canonical hashing + backend + decorator.
Responsibilities:
- Extract
idempotency_keyfrom the incoming request. - Scope lookups by
(principal_id, key)via the backend. - On cache hit with matching canonical payload hash: return the cached response
and mark
replayed=Trueon the envelope. - On cache hit with a different hash: raise
:class:
IdempotencyConflictError. - On miss: run the wrapped handler, then commit
(hash, response)to the backend.
Per-principal scoping is a hard security requirement (AdCP #2315): a key from
principal A on seller S has no meaning for principal B. The store pulls the
principal id from :class:ToolContext.caller_identity. If no
context / no caller_identity is supplied, the store refuses to proceed —
fail-closed rather than collapse every buyer into a shared namespace.
Classes
class IdempotencyStore (backend: IdempotencyBackend,
ttl_seconds: int = 86400,
hash_fn: Callable[[dict[str, Any]], str] = <function canonical_json_sha256>,
*,
clock: Callable[[], float] = <built-in function time>)-
Expand source code
class IdempotencyStore: """Coordinator that binds canonical hashing to a storage backend. :param backend: A concrete :class:`IdempotencyBackend`. :param ttl_seconds: How long cached responses remain replayable. Must be within the spec's ``[3600, 604800]`` range (1h to 7d). 86400 (24h) is the recommended floor and matches the compliance storyboard. :param hash_fn: Optional override for the canonical hash function. Defaults to :func:`canonical_json_sha256`. Exposed for tests and for anyone who wants to experiment with alternative equivalence rules — though note the spec mandates RFC 8785 JCS for interop. """ def __init__( self, backend: IdempotencyBackend, ttl_seconds: int = 86400, hash_fn: Callable[[dict[str, Any]], str] = canonical_json_sha256, *, clock: Callable[[], float] = time.time, ) -> None: if not _MIN_TTL_SECONDS <= ttl_seconds <= _MAX_TTL_SECONDS: raise ValueError( f"ttl_seconds must be in [{_MIN_TTL_SECONDS}, {_MAX_TTL_SECONDS}] " f"per AdCP spec (capabilities.idempotency.replay_ttl_seconds), " f"got {ttl_seconds}" ) self.backend = backend self.ttl_seconds = ttl_seconds self._hash_fn = hash_fn self._clock = clock def capability(self) -> dict[str, Any]: """Return the capabilities fragment declaring this store's replay window. Embed under ``capabilities.adcp.idempotency`` on the seller's ``get_adcp_capabilities`` response. Buyers read this to reason about retry-safe windows (AdCP #2315):: caps.adcp.idempotency = idempotency.capability() # → {"supported": True, "replay_ttl_seconds": 86400} ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only ``replay_ttl_seconds`` fail strict schema validation on the new capabilities response. """ return {"supported": True, "replay_ttl_seconds": self.ttl_seconds} def wrap(self, handler: HandlerFn) -> HandlerFn: """Decorator that adds idempotency semantics to an AdCP handler method. The wrapped handler is called as ``handler(self, params, context)``. ``params`` may be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (via ``model_dump`` if Pydantic). The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes ``model_validate``). Callers that return raw dicts get dicts back. """ @wraps(handler) async def _wrapped( handler_self: Any, params: Any, context: Any = None, *args: Any, **kwargs: Any, ) -> Any: principal_id, idempotency_key, params_dict = self._prepare(params, context) if principal_id is None or idempotency_key is None: # No key → spec says the server MUST reject with INVALID_REQUEST. # We let the handler run so validation layers above us (Pydantic, # FastAPI, etc.) can reject with a typed error; the middleware's # job is only to dedup when a key IS present. return await handler(handler_self, params, context, *args, **kwargs) payload_hash = self._hash_fn(params_dict) cached = await self.backend.get(principal_id, idempotency_key) if cached is not None: if cached.payload_hash == payload_hash: logger.debug( "idempotency replay: principal=%s key_prefix=%s", principal_id, idempotency_key[:8], ) return _clone_response(cached.response) # Same key, different payload — spec-defined conflict. raise IdempotencyConflictError( operation=getattr(handler, "__name__", "handler"), errors=[ { "code": "IDEMPOTENCY_CONFLICT", "message": ( "idempotency_key reused with a different payload " "(canonical hash mismatch)" ), } ], ) response = await handler(handler_self, params, context, *args, **kwargs) # Deep-copy when caching so post-return mutation of the caller's # copy can't poison future replays. `_clone_response` also deep- # copies on the hit path, giving independent objects per replay. response_dict = copy.deepcopy(_to_dict(response)) entry = CachedResponse( payload_hash=payload_hash, response=response_dict, expires_at_epoch=self._clock() + self.ttl_seconds, ) # Commit cache AFTER handler returns. Atomicity with the handler's # side effects depends on the backend: MemoryBackend is best-effort # (no transactional relationship to external resources); PgBackend # (follow-up) will commit in the same transaction when the handler # uses the same engine. On put failure we log loudly and return # the handler's response — swallowing the exception would be wrong # (operators need the signal that caching is broken), and raising # would look to the caller like the handler failed, triggering a # retry that re-executes side effects. Best compromise: warn # operators, return the result, and accept that the next retry # with this key will re-execute. try: await self.backend.put(principal_id, idempotency_key, entry) except Exception: logger.warning( "Idempotency cache put failed for principal=%s key_prefix=%s — " "handler completed but a subsequent retry with this key will " "re-execute rather than replay. This indicates an operational " "issue with the idempotency backend.", principal_id, idempotency_key[:8], exc_info=True, ) return response return _wrapped def _prepare(self, params: Any, context: Any) -> tuple[str | None, str | None, dict[str, Any]]: """Normalize inputs and extract the (principal, key, params_dict) tuple. Returns ``(None, None, params_dict)`` when idempotency doesn't apply (no caller identity or no key supplied). The caller falls through to the plain handler in that case — validation of missing-key lives in the request schema, not here. """ params_dict = _to_dict(params) idempotency_key = params_dict.get("idempotency_key") if not isinstance(idempotency_key, str) or not idempotency_key: return None, None, params_dict principal_id = _extract_principal_id(context) if principal_id is None: # No caller identity: we can't safely scope the key. Spec requires # per-principal scope; anything else is a cross-principal replay # attack surface. Fall through to the handler (which will process # the request normally — no dedup, but no security regression). self._warn_missing_principal_once() return None, None, params_dict return principal_id, idempotency_key, params_dict _missing_principal_warned: bool = False def _warn_missing_principal_once(self) -> None: """Emit a one-time warning when the middleware sees a key but no principal. Silent fall-through is the worst DX: the seller drops in ``@idempotency.wrap``, ships, and doesn't discover until incident review that no dedup ever happened. Fire once per store instance so operators see the signal without filling logs on every request. """ if self._missing_principal_warned: return self._missing_principal_warned = True warnings.warn( "IdempotencyStore received a request with idempotency_key but no " "caller_identity on ToolContext — dedup is SKIPPED. This usually " "means your transport isn't populating the authenticated principal. " "A2A: wire an a2a-sdk auth middleware that sets ServerCallContext.user; " "MCP: populate ToolContext.caller_identity from your FastMCP auth " "middleware (see adcp.server.idempotency README). " "This warning fires once per IdempotencyStore instance.", UserWarning, stacklevel=3, )Coordinator that binds canonical hashing to a storage backend.
:param backend: A concrete :class:
IdempotencyBackend. :param ttl_seconds: How long cached responses remain replayable. Must be within the spec's[3600, 604800]range (1h to 7d). 86400 (24h) is the recommended floor and matches the compliance storyboard. :param hash_fn: Optional override for the canonical hash function. Defaults to :func:canonical_json_sha256. Exposed for tests and for anyone who wants to experiment with alternative equivalence rules — though note the spec mandates RFC 8785 JCS for interop.Methods
def capability(self) ‑> dict[str, typing.Any]-
Expand source code
def capability(self) -> dict[str, Any]: """Return the capabilities fragment declaring this store's replay window. Embed under ``capabilities.adcp.idempotency`` on the seller's ``get_adcp_capabilities`` response. Buyers read this to reason about retry-safe windows (AdCP #2315):: caps.adcp.idempotency = idempotency.capability() # → {"supported": True, "replay_ttl_seconds": 86400} ``supported`` became REQUIRED in AdCP 3.0 GA — agents emitting only ``replay_ttl_seconds`` fail strict schema validation on the new capabilities response. """ return {"supported": True, "replay_ttl_seconds": self.ttl_seconds}Return the capabilities fragment declaring this store's replay window.
Embed under
capabilities.adcp.idempotencyon the seller'sget_adcp_capabilitiesresponse. Buyers read this to reason about retry-safe windows (AdCP #2315)::caps.adcp.idempotency = idempotency.capability() # → {"supported": True, "replay_ttl_seconds": 86400}supportedbecame REQUIRED in AdCP 3.0 GA — agents emitting onlyreplay_ttl_secondsfail strict schema validation on the new capabilities response. def wrap(self, handler: HandlerFn) ‑> Callable[..., Awaitable[typing.Any]]-
Expand source code
def wrap(self, handler: HandlerFn) -> HandlerFn: """Decorator that adds idempotency semantics to an AdCP handler method. The wrapped handler is called as ``handler(self, params, context)``. ``params`` may be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (via ``model_dump`` if Pydantic). The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes ``model_validate``). Callers that return raw dicts get dicts back. """ @wraps(handler) async def _wrapped( handler_self: Any, params: Any, context: Any = None, *args: Any, **kwargs: Any, ) -> Any: principal_id, idempotency_key, params_dict = self._prepare(params, context) if principal_id is None or idempotency_key is None: # No key → spec says the server MUST reject with INVALID_REQUEST. # We let the handler run so validation layers above us (Pydantic, # FastAPI, etc.) can reject with a typed error; the middleware's # job is only to dedup when a key IS present. return await handler(handler_self, params, context, *args, **kwargs) payload_hash = self._hash_fn(params_dict) cached = await self.backend.get(principal_id, idempotency_key) if cached is not None: if cached.payload_hash == payload_hash: logger.debug( "idempotency replay: principal=%s key_prefix=%s", principal_id, idempotency_key[:8], ) return _clone_response(cached.response) # Same key, different payload — spec-defined conflict. raise IdempotencyConflictError( operation=getattr(handler, "__name__", "handler"), errors=[ { "code": "IDEMPOTENCY_CONFLICT", "message": ( "idempotency_key reused with a different payload " "(canonical hash mismatch)" ), } ], ) response = await handler(handler_self, params, context, *args, **kwargs) # Deep-copy when caching so post-return mutation of the caller's # copy can't poison future replays. `_clone_response` also deep- # copies on the hit path, giving independent objects per replay. response_dict = copy.deepcopy(_to_dict(response)) entry = CachedResponse( payload_hash=payload_hash, response=response_dict, expires_at_epoch=self._clock() + self.ttl_seconds, ) # Commit cache AFTER handler returns. Atomicity with the handler's # side effects depends on the backend: MemoryBackend is best-effort # (no transactional relationship to external resources); PgBackend # (follow-up) will commit in the same transaction when the handler # uses the same engine. On put failure we log loudly and return # the handler's response — swallowing the exception would be wrong # (operators need the signal that caching is broken), and raising # would look to the caller like the handler failed, triggering a # retry that re-executes side effects. Best compromise: warn # operators, return the result, and accept that the next retry # with this key will re-execute. try: await self.backend.put(principal_id, idempotency_key, entry) except Exception: logger.warning( "Idempotency cache put failed for principal=%s key_prefix=%s — " "handler completed but a subsequent retry with this key will " "re-execute rather than replay. This indicates an operational " "issue with the idempotency backend.", principal_id, idempotency_key[:8], exc_info=True, ) return response return _wrappedDecorator that adds idempotency semantics to an AdCP handler method.
The wrapped handler is called as
handler(self, params, context).paramsmay be a dict or a Pydantic model — both are normalized to a dict before hashing. The return value is coerced to a dict for caching (viamodel_dumpif Pydantic).The decorator always returns the handler's original object on a cache miss and a best-effort Pydantic re-validation on a hit (when the handler's declared return type exposes
model_validate). Callers that return raw dicts get dicts back.