Module adcp.registry
Classes
class RegistryClient (base_url: str = 'https://agenticadvertising.org',
timeout: float = 10.0,
client: httpx.AsyncClient | None = None,
user_agent: str = 'adcp-client-python')-
Expand source code
class RegistryClient: """Client for the AdCP registry API. Provides brand and property lookups against the central AdCP registry. Args: base_url: Registry API base URL. timeout: Request timeout in seconds. client: Optional httpx.AsyncClient for connection pooling. If provided, caller is responsible for client lifecycle. user_agent: User-Agent header for requests. """ def __init__( self, base_url: str = DEFAULT_REGISTRY_URL, timeout: float = 10.0, client: httpx.AsyncClient | None = None, user_agent: str = "adcp-client-python", ): self._base_url = base_url.rstrip("/") self._timeout = timeout self._external_client = client self._owned_client: httpx.AsyncClient | None = None self._user_agent = user_agent async def _get_client(self) -> httpx.AsyncClient: """Get or create httpx client.""" if self._external_client is not None: return self._external_client if self._owned_client is None: self._owned_client = httpx.AsyncClient( limits=httpx.Limits( max_keepalive_connections=10, max_connections=20, ), ) return self._owned_client async def close(self) -> None: """Close owned HTTP client. No-op if using external client.""" if self._owned_client is not None: await self._owned_client.aclose() self._owned_client = None async def __aenter__(self) -> RegistryClient: return self async def __aexit__(self, *args: Any) -> None: await self.close() async def lookup_brand(self, domain: str) -> ResolvedBrand | None: """Resolve a single domain to its canonical brand identity. Args: domain: Domain to resolve (e.g., "nike.com"). Returns: ResolvedBrand if found, None if the domain is not in the registry. Raises: RegistryError: On HTTP or parsing errors. """ client = await self._get_client() try: response = await client.get( f"{self._base_url}/api/brands/resolve", params={"domain": domain}, headers={"User-Agent": self._user_agent}, timeout=self._timeout, ) if response.status_code == 404: return None if response.status_code != 200: raise RegistryError( f"Brand lookup failed: HTTP {response.status_code}", status_code=response.status_code, ) data = response.json() if data is None: return None return ResolvedBrand.model_validate(data) except RegistryError: raise except httpx.TimeoutException as e: raise RegistryError(f"Brand lookup timed out after {self._timeout}s") from e except httpx.HTTPError as e: raise RegistryError(f"Brand lookup failed: {e}") from e except (ValidationError, ValueError) as e: raise RegistryError(f"Brand lookup failed: invalid response: {e}") from e async def lookup_brands( self, domains: list[str] ) -> dict[str, ResolvedBrand | None]: """Bulk resolve domains to brand identities. Automatically chunks requests exceeding 100 domains. Args: domains: List of domains to resolve. Returns: Dict mapping each domain to its ResolvedBrand, or None if not found. Raises: RegistryError: On HTTP or parsing errors. """ if not domains: return {} chunks = [ domains[i : i + MAX_BULK_DOMAINS] for i in range(0, len(domains), MAX_BULK_DOMAINS) ] chunk_results = await asyncio.gather( *[self._lookup_brands_chunk(chunk) for chunk in chunks], return_exceptions=True, ) merged: dict[str, ResolvedBrand | None] = {} for result in chunk_results: if isinstance(result, BaseException): raise result merged.update(result) return merged async def _lookup_brands_chunk( self, domains: list[str] ) -> dict[str, ResolvedBrand | None]: """Resolve a single chunk of brand domains (max 100).""" client = await self._get_client() try: response = await client.post( f"{self._base_url}/api/brands/resolve/bulk", json={"domains": domains}, headers={"User-Agent": self._user_agent}, timeout=self._timeout, ) if response.status_code != 200: raise RegistryError( f"Bulk brand lookup failed: HTTP {response.status_code}", status_code=response.status_code, ) data = response.json() results_raw = data.get("results", {}) results: dict[str, ResolvedBrand | None] = {d: None for d in domains} for domain, brand_data in results_raw.items(): if brand_data is not None: results[domain] = ResolvedBrand.model_validate(brand_data) return results except RegistryError: raise except httpx.TimeoutException as e: raise RegistryError( f"Bulk brand lookup timed out after {self._timeout}s" ) from e except httpx.HTTPError as e: raise RegistryError(f"Bulk brand lookup failed: {e}") from e except (ValidationError, ValueError) as e: raise RegistryError(f"Bulk brand lookup failed: invalid response: {e}") from e async def lookup_property(self, domain: str) -> ResolvedProperty | None: """Resolve a publisher domain to its property info. Args: domain: Publisher domain to resolve (e.g., "nytimes.com"). Returns: ResolvedProperty if found, None if the domain is not in the registry. Raises: RegistryError: On HTTP or parsing errors. """ client = await self._get_client() try: response = await client.get( f"{self._base_url}/api/properties/resolve", params={"domain": domain}, headers={"User-Agent": self._user_agent}, timeout=self._timeout, ) if response.status_code == 404: return None if response.status_code != 200: raise RegistryError( f"Property lookup failed: HTTP {response.status_code}", status_code=response.status_code, ) data = response.json() if data is None: return None return ResolvedProperty.model_validate(data) except RegistryError: raise except httpx.TimeoutException as e: raise RegistryError( f"Property lookup timed out after {self._timeout}s" ) from e except httpx.HTTPError as e: raise RegistryError(f"Property lookup failed: {e}") from e except (ValidationError, ValueError) as e: raise RegistryError(f"Property lookup failed: invalid response: {e}") from e async def lookup_properties( self, domains: list[str] ) -> dict[str, ResolvedProperty | None]: """Bulk resolve publisher domains to property info. Automatically chunks requests exceeding 100 domains. Args: domains: List of publisher domains to resolve. Returns: Dict mapping each domain to its ResolvedProperty, or None if not found. Raises: RegistryError: On HTTP or parsing errors. """ if not domains: return {} chunks = [ domains[i : i + MAX_BULK_DOMAINS] for i in range(0, len(domains), MAX_BULK_DOMAINS) ] chunk_results = await asyncio.gather( *[self._lookup_properties_chunk(chunk) for chunk in chunks], return_exceptions=True, ) merged: dict[str, ResolvedProperty | None] = {} for result in chunk_results: if isinstance(result, BaseException): raise result merged.update(result) return merged async def _lookup_properties_chunk( self, domains: list[str] ) -> dict[str, ResolvedProperty | None]: """Resolve a single chunk of property domains (max 100).""" client = await self._get_client() try: response = await client.post( f"{self._base_url}/api/properties/resolve/bulk", json={"domains": domains}, headers={"User-Agent": self._user_agent}, timeout=self._timeout, ) if response.status_code != 200: raise RegistryError( f"Bulk property lookup failed: HTTP {response.status_code}", status_code=response.status_code, ) data = response.json() results_raw = data.get("results", {}) results: dict[str, ResolvedProperty | None] = {d: None for d in domains} for domain, prop_data in results_raw.items(): if prop_data is not None: results[domain] = ResolvedProperty.model_validate(prop_data) return results except RegistryError: raise except httpx.TimeoutException as e: raise RegistryError( f"Bulk property lookup timed out after {self._timeout}s" ) from e except httpx.HTTPError as e: raise RegistryError(f"Bulk property lookup failed: {e}") from e except (ValidationError, ValueError) as e: raise RegistryError( f"Bulk property lookup failed: invalid response: {e}" ) from eClient for the AdCP registry API.
Provides brand and property lookups against the central AdCP registry.
Args
base_url- Registry API base URL.
timeout- Request timeout in seconds.
client- Optional httpx.AsyncClient for connection pooling. If provided, caller is responsible for client lifecycle.
user_agent- User-Agent header for requests.
Methods
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close owned HTTP client. No-op if using external client.""" if self._owned_client is not None: await self._owned_client.aclose() self._owned_client = NoneClose owned HTTP client. No-op if using external client.
async def lookup_brand(self, domain: str) ‑> ResolvedBrand | None-
Expand source code
async def lookup_brand(self, domain: str) -> ResolvedBrand | None: """Resolve a single domain to its canonical brand identity. Args: domain: Domain to resolve (e.g., "nike.com"). Returns: ResolvedBrand if found, None if the domain is not in the registry. Raises: RegistryError: On HTTP or parsing errors. """ client = await self._get_client() try: response = await client.get( f"{self._base_url}/api/brands/resolve", params={"domain": domain}, headers={"User-Agent": self._user_agent}, timeout=self._timeout, ) if response.status_code == 404: return None if response.status_code != 200: raise RegistryError( f"Brand lookup failed: HTTP {response.status_code}", status_code=response.status_code, ) data = response.json() if data is None: return None return ResolvedBrand.model_validate(data) except RegistryError: raise except httpx.TimeoutException as e: raise RegistryError(f"Brand lookup timed out after {self._timeout}s") from e except httpx.HTTPError as e: raise RegistryError(f"Brand lookup failed: {e}") from e except (ValidationError, ValueError) as e: raise RegistryError(f"Brand lookup failed: invalid response: {e}") from eResolve a single domain to its canonical brand identity.
Args
domain- Domain to resolve (e.g., "nike.com").
Returns
ResolvedBrand if found, None if the domain is not in the registry.
Raises
RegistryError- On HTTP or parsing errors.
async def lookup_brands(self, domains: list[str]) ‑> dict[str, ResolvedBrand | None]-
Expand source code
async def lookup_brands( self, domains: list[str] ) -> dict[str, ResolvedBrand | None]: """Bulk resolve domains to brand identities. Automatically chunks requests exceeding 100 domains. Args: domains: List of domains to resolve. Returns: Dict mapping each domain to its ResolvedBrand, or None if not found. Raises: RegistryError: On HTTP or parsing errors. """ if not domains: return {} chunks = [ domains[i : i + MAX_BULK_DOMAINS] for i in range(0, len(domains), MAX_BULK_DOMAINS) ] chunk_results = await asyncio.gather( *[self._lookup_brands_chunk(chunk) for chunk in chunks], return_exceptions=True, ) merged: dict[str, ResolvedBrand | None] = {} for result in chunk_results: if isinstance(result, BaseException): raise result merged.update(result) return mergedBulk resolve domains to brand identities.
Automatically chunks requests exceeding 100 domains.
Args
domains- List of domains to resolve.
Returns
Dict mapping each domain to its ResolvedBrand, or None if not found.
Raises
RegistryError- On HTTP or parsing errors.
async def lookup_properties(self, domains: list[str]) ‑> dict[str, ResolvedProperty | None]-
Expand source code
async def lookup_properties( self, domains: list[str] ) -> dict[str, ResolvedProperty | None]: """Bulk resolve publisher domains to property info. Automatically chunks requests exceeding 100 domains. Args: domains: List of publisher domains to resolve. Returns: Dict mapping each domain to its ResolvedProperty, or None if not found. Raises: RegistryError: On HTTP or parsing errors. """ if not domains: return {} chunks = [ domains[i : i + MAX_BULK_DOMAINS] for i in range(0, len(domains), MAX_BULK_DOMAINS) ] chunk_results = await asyncio.gather( *[self._lookup_properties_chunk(chunk) for chunk in chunks], return_exceptions=True, ) merged: dict[str, ResolvedProperty | None] = {} for result in chunk_results: if isinstance(result, BaseException): raise result merged.update(result) return mergedBulk resolve publisher domains to property info.
Automatically chunks requests exceeding 100 domains.
Args
domains- List of publisher domains to resolve.
Returns
Dict mapping each domain to its ResolvedProperty, or None if not found.
Raises
RegistryError- On HTTP or parsing errors.
async def lookup_property(self, domain: str) ‑> ResolvedProperty | None-
Expand source code
async def lookup_property(self, domain: str) -> ResolvedProperty | None: """Resolve a publisher domain to its property info. Args: domain: Publisher domain to resolve (e.g., "nytimes.com"). Returns: ResolvedProperty if found, None if the domain is not in the registry. Raises: RegistryError: On HTTP or parsing errors. """ client = await self._get_client() try: response = await client.get( f"{self._base_url}/api/properties/resolve", params={"domain": domain}, headers={"User-Agent": self._user_agent}, timeout=self._timeout, ) if response.status_code == 404: return None if response.status_code != 200: raise RegistryError( f"Property lookup failed: HTTP {response.status_code}", status_code=response.status_code, ) data = response.json() if data is None: return None return ResolvedProperty.model_validate(data) except RegistryError: raise except httpx.TimeoutException as e: raise RegistryError( f"Property lookup timed out after {self._timeout}s" ) from e except httpx.HTTPError as e: raise RegistryError(f"Property lookup failed: {e}") from e except (ValidationError, ValueError) as e: raise RegistryError(f"Property lookup failed: invalid response: {e}") from eResolve a publisher domain to its property info.
Args
domain- Publisher domain to resolve (e.g., "nytimes.com").
Returns
ResolvedProperty if found, None if the domain is not in the registry.
Raises
RegistryError- On HTTP or parsing errors.