Module adcp.adagents

Functions

def domain_matches(property_domain: str, agent_domain_pattern: str) ‑> bool
Expand source code
def domain_matches(property_domain: str, agent_domain_pattern: str) -> bool:
    """Check if domains match per AdCP rules.

    Rules:
    - Exact match always succeeds
    - 'example.com' matches www.example.com, m.example.com (common subdomains)
    - 'subdomain.example.com' matches that specific subdomain only
    - '*.example.com' matches all subdomains

    Args:
        property_domain: Domain from property
        agent_domain_pattern: Domain pattern from adagents.json

    Returns:
        True if domains match per AdCP rules
    """
    # Normalize both domains for comparison
    try:
        property_domain = _normalize_domain(property_domain)
        agent_domain_pattern = _normalize_domain(agent_domain_pattern)
    except AdagentsValidationError:
        # Invalid domain format - no match
        return False

    # Exact match
    if property_domain == agent_domain_pattern:
        return True

    # Wildcard pattern (*.example.com)
    if agent_domain_pattern.startswith("*."):
        base_domain = agent_domain_pattern[2:]
        return property_domain.endswith(f".{base_domain}")

    # Bare domain matches common subdomains (www, m)
    # If agent pattern is a bare domain (no subdomain), match www/m subdomains
    if "." in agent_domain_pattern and not agent_domain_pattern.startswith("www."):
        # Check if this looks like a bare domain (e.g., example.com)
        parts = agent_domain_pattern.split(".")
        if len(parts) == 2:  # Looks like bare domain
            common_subdomains = ["www", "m"]
            for subdomain in common_subdomains:
                if property_domain == f"{subdomain}.{agent_domain_pattern}":
                    return True

    return False

Check if domains match per AdCP rules.

Rules: - Exact match always succeeds - 'example.com' matches www.example.com, m.example.com (common subdomains) - 'subdomain.example.com' matches that specific subdomain only - '*.example.com' matches all subdomains

Args

property_domain
Domain from property
agent_domain_pattern
Domain pattern from adagents.json

Returns

True if domains match per AdCP rules

async def fetch_adagents(publisher_domain: str,
timeout: float = 10.0,
user_agent: str = 'AdCP-Client/1.0',
client: httpx.AsyncClient | None = None) ‑> dict[str, typing.Any]
Expand source code
async def fetch_adagents(
    publisher_domain: str,
    timeout: float = 10.0,
    user_agent: str = "AdCP-Client/1.0",
    client: httpx.AsyncClient | None = None,
) -> dict[str, Any]:
    """Fetch and parse adagents.json from publisher domain.

    Follows authoritative_location redirects per the AdCP specification. When a
    publisher's adagents.json contains an authoritative_location field instead of
    authorized_agents, this function fetches the referenced URL to get the actual
    authorization data.

    Args:
        publisher_domain: Domain hosting the adagents.json file
        timeout: Request timeout in seconds
        user_agent: User-Agent header for HTTP request
        client: Optional httpx.AsyncClient for connection pooling.
            If provided, caller is responsible for client lifecycle.
            If None, a new client is created for this request.

    Returns:
        Parsed adagents.json data (resolved from authoritative_location if present)

    Raises:
        AdagentsNotFoundError: If adagents.json not found (404)
        AdagentsValidationError: If JSON is invalid, malformed, or redirects
            exceed maximum depth or form a loop
        AdagentsTimeoutError: If request times out

    Notes:
        For production use with multiple requests, pass a shared httpx.AsyncClient
        to enable connection pooling and improve performance.
    """
    # Validate and normalize domain for security
    publisher_domain = _validate_publisher_domain(publisher_domain)

    # Construct initial URL
    url = f"https://{publisher_domain}/.well-known/adagents.json"

    # Track visited URLs to detect loops
    visited_urls: set[str] = set()

    for depth in range(MAX_REDIRECT_DEPTH + 1):
        # Check for redirect loop
        if url in visited_urls:
            raise AdagentsValidationError(f"Circular redirect detected: {url} already visited")
        visited_urls.add(url)

        data = await _fetch_adagents_url(url, timeout, user_agent, client)

        # Check if this is a redirect. A response with authoritative_location but no
        # authorized_agents indicates a redirect. If both are present, authorized_agents
        # takes precedence (response is treated as final).
        if "authoritative_location" in data and "authorized_agents" not in data:
            authoritative_url = data["authoritative_location"]

            # Validate HTTPS requirement
            if not isinstance(authoritative_url, str) or not authoritative_url.startswith(
                "https://"
            ):
                raise AdagentsValidationError(
                    f"authoritative_location must be an HTTPS URL, got: {authoritative_url!r}"
                )

            # Check if we've exceeded max depth
            if depth >= MAX_REDIRECT_DEPTH:
                raise AdagentsValidationError(
                    f"Maximum redirect depth ({MAX_REDIRECT_DEPTH}) exceeded"
                )

            # Follow the redirect
            url = authoritative_url
            continue

        # We have the final data with authorized_agents (or both fields present,
        # in which case authorized_agents takes precedence)
        return data

    # Unreachable: loop always exits via return or raise above
    raise AssertionError("Unreachable")  # pragma: no cover

Fetch and parse adagents.json from publisher domain.

Follows authoritative_location redirects per the AdCP specification. When a publisher's adagents.json contains an authoritative_location field instead of authorized_agents, this function fetches the referenced URL to get the actual authorization data.

Args

publisher_domain
Domain hosting the adagents.json file
timeout
Request timeout in seconds
user_agent
User-Agent header for HTTP request
client
Optional httpx.AsyncClient for connection pooling. If provided, caller is responsible for client lifecycle. If None, a new client is created for this request.

Returns

Parsed adagents.json data (resolved from authoritative_location if present)

Raises

AdagentsNotFoundError
If adagents.json not found (404)
AdagentsValidationError
If JSON is invalid, malformed, or redirects exceed maximum depth or form a loop
AdagentsTimeoutError
If request times out

Notes

For production use with multiple requests, pass a shared httpx.AsyncClient to enable connection pooling and improve performance.

async def fetch_agent_authorizations(agent_url: str,
publisher_domains: list[str],
timeout: float = 10.0,
client: httpx.AsyncClient | None = None) ‑> dict[str, AuthorizationContext]
Expand source code
async def fetch_agent_authorizations(
    agent_url: str,
    publisher_domains: list[str],
    timeout: float = 10.0,
    client: httpx.AsyncClient | None = None,
) -> dict[str, AuthorizationContext]:
    """Fetch authorization contexts by checking publisher adagents.json files.

    This function discovers what publishers have authorized your agent by fetching
    their adagents.json files from the .well-known directory and extracting the
    properties your agent can access.

    This is the "pull" approach - you query publishers to see if they've authorized you.

    Args:
        agent_url: URL of your sales agent
        publisher_domains: List of publisher domains to check (e.g., ["nytimes.com", "wsj.com"])
        timeout: Request timeout in seconds for each fetch
        client: Optional httpx.AsyncClient for connection pooling

    Returns:
        Dictionary mapping publisher domain to AuthorizationContext.
        Only includes domains where the agent is authorized.

    Example:
        >>> # "Pull" approach - check what publishers have authorized you
        >>> contexts = await fetch_agent_authorizations(
        ...     "https://our-sales-agent.com",
        ...     ["nytimes.com", "wsj.com", "cnn.com"]
        ... )
        >>> for domain, ctx in contexts.items():
        ...     print(f"{domain}:")
        ...     print(f"  Property IDs: {ctx.property_ids}")
        ...     print(f"  Tags: {ctx.property_tags}")

    Notes:
        - Silently skips domains where adagents.json is not found or invalid
        - Only returns domains where the agent is explicitly authorized
        - For production use with many domains, pass a shared httpx.AsyncClient
          to enable connection pooling
    """
    import asyncio

    # Create tasks to fetch all adagents.json files in parallel
    async def fetch_authorization_for_domain(
        domain: str,
    ) -> tuple[str, AuthorizationContext | None]:
        """Fetch authorization context for a single domain."""
        try:
            adagents_data = await fetch_adagents(domain, timeout=timeout, client=client)

            # Check if agent is authorized
            if not verify_agent_authorization(adagents_data, agent_url):
                return (domain, None)

            # Get properties for this agent
            properties = get_properties_by_agent(adagents_data, agent_url)

            # Create authorization context
            return (domain, AuthorizationContext(properties))

        except (AdagentsNotFoundError, AdagentsValidationError, AdagentsTimeoutError):
            # Silently skip domains with missing or invalid adagents.json
            return (domain, None)

    # Fetch all domains in parallel
    tasks = [fetch_authorization_for_domain(domain) for domain in publisher_domains]
    results = await asyncio.gather(*tasks)

    # Build result dictionary, filtering out None values
    return {domain: ctx for domain, ctx in results if ctx is not None}

Fetch authorization contexts by checking publisher adagents.json files.

This function discovers what publishers have authorized your agent by fetching their adagents.json files from the .well-known directory and extracting the properties your agent can access.

This is the "pull" approach - you query publishers to see if they've authorized you.

Args

agent_url
URL of your sales agent
publisher_domains
List of publisher domains to check (e.g., ["nytimes.com", "wsj.com"])
timeout
Request timeout in seconds for each fetch
client
Optional httpx.AsyncClient for connection pooling

Returns

Dictionary mapping publisher domain to AuthorizationContext. Only includes domains where the agent is authorized.

Example

>>> # "Pull" approach - check what publishers have authorized you
>>> contexts = await fetch_agent_authorizations(
...     "https://our-sales-agent.com",
...     ["nytimes.com", "wsj.com", "cnn.com"]
... )
>>> for domain, ctx in contexts.items():
...     print(f"{domain}:")
...     print(f"  Property IDs: {ctx.property_ids}")
...     print(f"  Tags: {ctx.property_tags}")

Notes

  • Silently skips domains where adagents.json is not found or invalid
  • Only returns domains where the agent is explicitly authorized
  • For production use with many domains, pass a shared httpx.AsyncClient to enable connection pooling
def get_all_properties(adagents_data: dict[str, Any]) ‑> list[dict[str, typing.Any]]
Expand source code
def get_all_properties(adagents_data: dict[str, Any]) -> list[dict[str, Any]]:
    """Extract all properties from adagents.json data.

    Args:
        adagents_data: Parsed adagents.json data

    Returns:
        List of all properties across all authorized agents, with agent_url added

    Raises:
        AdagentsValidationError: If adagents_data is malformed
    """
    if not isinstance(adagents_data, dict):
        raise AdagentsValidationError("adagents_data must be a dictionary")

    authorized_agents = adagents_data.get("authorized_agents")
    if not isinstance(authorized_agents, list):
        raise AdagentsValidationError("adagents.json must have 'authorized_agents' array")

    properties = []
    for agent in authorized_agents:
        if not isinstance(agent, dict):
            continue

        agent_url = agent.get("url", "")
        if not agent_url:
            continue

        agent_properties = agent.get("properties", [])
        if not isinstance(agent_properties, list):
            continue

        # Add each property with the agent URL for reference
        for prop in agent_properties:
            if isinstance(prop, dict):
                # Create a copy and add agent_url
                prop_with_agent = {**prop, "agent_url": agent_url}
                properties.append(prop_with_agent)

    return properties

Extract all properties from adagents.json data.

Args

adagents_data
Parsed adagents.json data

Returns

List of all properties across all authorized agents, with agent_url added

Raises

AdagentsValidationError
If adagents_data is malformed
def get_all_tags(adagents_data: dict[str, Any]) ‑> set[str]
Expand source code
def get_all_tags(adagents_data: dict[str, Any]) -> set[str]:
    """Extract all unique tags from properties in adagents.json data.

    Args:
        adagents_data: Parsed adagents.json data

    Returns:
        Set of all unique tags across all properties

    Raises:
        AdagentsValidationError: If adagents_data is malformed
    """
    properties = get_all_properties(adagents_data)
    tags = set()

    for prop in properties:
        prop_tags = prop.get("tags", [])
        if isinstance(prop_tags, list):
            for tag in prop_tags:
                if isinstance(tag, str):
                    tags.add(tag)

    return tags

Extract all unique tags from properties in adagents.json data.

Args

adagents_data
Parsed adagents.json data

Returns

Set of all unique tags across all properties

Raises

AdagentsValidationError
If adagents_data is malformed
def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) ‑> list[dict[str, typing.Any]]
Expand source code
def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) -> list[dict[str, Any]]:
    """Get all properties authorized for a specific agent.

    Handles all authorization types per the AdCP specification:
    - inline_properties: Properties defined directly in the agent's properties array
    - property_ids: Filter top-level properties by property_id
    - property_tags: Filter top-level properties by tags
    - publisher_properties: References properties from other publisher domains
      (returns the selector objects, not resolved properties)

    Args:
        adagents_data: Parsed adagents.json data
        agent_url: URL of the agent to filter by

    Returns:
        List of properties for the specified agent (empty if agent not found)

    Raises:
        AdagentsValidationError: If adagents_data is malformed
    """
    if not isinstance(adagents_data, dict):
        raise AdagentsValidationError("adagents_data must be a dictionary")

    authorized_agents = adagents_data.get("authorized_agents")
    if not isinstance(authorized_agents, list):
        raise AdagentsValidationError("adagents.json must have 'authorized_agents' array")

    # Get top-level properties for reference-based authorization types
    top_level_properties = adagents_data.get("properties", [])
    if not isinstance(top_level_properties, list):
        top_level_properties = []

    # Normalize the agent URL for comparison
    normalized_agent_url = normalize_url(agent_url)

    for agent in authorized_agents:
        if not isinstance(agent, dict):
            continue

        agent_url_from_json = agent.get("url", "")
        if not agent_url_from_json:
            continue

        # Match agent URL (protocol-agnostic)
        if normalize_url(agent_url_from_json) != normalized_agent_url:
            continue

        # Found the agent - determine authorization type
        authorization_type = agent.get("authorization_type", "")

        # Handle inline_properties (properties array directly on agent)
        if authorization_type == "inline_properties" or "properties" in agent:
            properties = agent.get("properties", [])
            if not isinstance(properties, list):
                return []
            return [p for p in properties if isinstance(p, dict)]

        # Handle property_ids (filter top-level properties by property_id)
        if authorization_type == "property_ids":
            authorized_ids = set(agent.get("property_ids", []))
            return [
                p
                for p in top_level_properties
                if isinstance(p, dict) and p.get("property_id") in authorized_ids
            ]

        # Handle property_tags (filter top-level properties by tags)
        if authorization_type == "property_tags":
            authorized_tags = set(agent.get("property_tags", []))
            return [
                p
                for p in top_level_properties
                if isinstance(p, dict) and set(p.get("tags", [])) & authorized_tags
            ]

        # Handle publisher_properties (cross-domain references)
        # Returns the selector objects; caller must resolve against other domains
        if authorization_type == "publisher_properties":
            publisher_props = agent.get("publisher_properties", [])
            if not isinstance(publisher_props, list):
                return []
            return [p for p in publisher_props if isinstance(p, dict)]

        # No recognized authorization type - return empty
        return []

    return []

Get all properties authorized for a specific agent.

Handles all authorization types per the AdCP specification: - inline_properties: Properties defined directly in the agent's properties array - property_ids: Filter top-level properties by property_id - property_tags: Filter top-level properties by tags - publisher_properties: References properties from other publisher domains (returns the selector objects, not resolved properties)

Args

adagents_data
Parsed adagents.json data
agent_url
URL of the agent to filter by

Returns

List of properties for the specified agent (empty if agent not found)

Raises

AdagentsValidationError
If adagents_data is malformed
def identifiers_match(property_identifiers: list[dict[str, str]],
agent_identifiers: list[dict[str, str]]) ‑> bool
Expand source code
def identifiers_match(
    property_identifiers: list[dict[str, str]],
    agent_identifiers: list[dict[str, str]],
) -> bool:
    """Check if any property identifier matches agent's authorized identifiers.

    Args:
        property_identifiers: Identifiers from property
            (e.g., [{"type": "domain", "value": "cnn.com"}])
        agent_identifiers: Identifiers from adagents.json

    Returns:
        True if any identifier matches

    Notes:
        - Domain identifiers use AdCP domain matching rules
        - Other identifiers (bundle_id, roku_store_id, etc.) require exact match
    """
    for prop_id in property_identifiers:
        prop_type = prop_id.get("type", "")
        prop_value = prop_id.get("value", "")

        for agent_id in agent_identifiers:
            agent_type = agent_id.get("type", "")
            agent_value = agent_id.get("value", "")

            # Type must match
            if prop_type != agent_type:
                continue

            # Domain identifiers use special matching rules
            if prop_type == "domain":
                if domain_matches(prop_value, agent_value):
                    return True
            else:
                # Other identifier types require exact match
                if prop_value == agent_value:
                    return True

    return False

Check if any property identifier matches agent's authorized identifiers.

Args

property_identifiers
Identifiers from property (e.g., [{"type": "domain", "value": "cnn.com"}])
agent_identifiers
Identifiers from adagents.json

Returns

True if any identifier matches

Notes

  • Domain identifiers use AdCP domain matching rules
  • Other identifiers (bundle_id, roku_store_id, etc.) require exact match
def normalize_url(url: str) ‑> str
Expand source code
def normalize_url(url: str) -> str:
    """Normalize URL by removing protocol and trailing slash.

    Args:
        url: URL to normalize

    Returns:
        Normalized URL (domain/path without protocol or trailing slash)
    """
    parsed = urlparse(url)
    normalized = parsed.netloc + parsed.path
    return normalized.rstrip("/")

Normalize URL by removing protocol and trailing slash.

Args

url
URL to normalize

Returns

Normalized URL (domain/path without protocol or trailing slash)

def verify_agent_authorization(adagents_data: dict[str, Any],
agent_url: str,
property_type: str | None = None,
property_identifiers: list[dict[str, str]] | None = None) ‑> bool
Expand source code
def verify_agent_authorization(
    adagents_data: dict[str, Any],
    agent_url: str,
    property_type: str | None = None,
    property_identifiers: list[dict[str, str]] | None = None,
) -> bool:
    """Check if agent is authorized for a property.

    Args:
        adagents_data: Parsed adagents.json data
        agent_url: URL of the sales agent to verify
        property_type: Type of property (website, app, etc.) - optional
        property_identifiers: List of identifiers to match - optional

    Returns:
        True if agent is authorized, False otherwise

    Raises:
        AdagentsValidationError: If adagents_data is malformed

    Notes:
        - If property_type/identifiers are None, checks if agent is authorized
          for ANY property on this domain
        - Implements AdCP domain matching rules
        - Agent URLs are matched ignoring protocol and trailing slash
    """
    # Validate structure
    if not isinstance(adagents_data, dict):
        raise AdagentsValidationError("adagents_data must be a dictionary")

    authorized_agents = adagents_data.get("authorized_agents")
    if not isinstance(authorized_agents, list):
        raise AdagentsValidationError("adagents.json must have 'authorized_agents' array")

    # Normalize the agent URL for comparison
    normalized_agent_url = normalize_url(agent_url)

    # Check each authorized agent
    for agent in authorized_agents:
        if not isinstance(agent, dict):
            continue

        agent_url_from_json = agent.get("url", "")
        if not agent_url_from_json:
            continue

        # Match agent URL (protocol-agnostic)
        if normalize_url(agent_url_from_json) != normalized_agent_url:
            continue

        # Found matching agent - now check properties
        properties = agent.get("properties")

        # If properties field is missing or empty, agent is authorized for all properties
        if properties is None or (isinstance(properties, list) and len(properties) == 0):
            return True

        # If no property filters specified, we found the agent - authorized
        if property_type is None and property_identifiers is None:
            return True

        # Check specific property authorization
        if isinstance(properties, list):
            for prop in properties:
                if not isinstance(prop, dict):
                    continue

                # Check property type if specified
                if property_type is not None:
                    prop_type = prop.get("property_type", "")
                    if prop_type != property_type:
                        continue

                # Check identifiers if specified
                if property_identifiers is not None:
                    prop_identifiers = prop.get("identifiers", [])
                    if not isinstance(prop_identifiers, list):
                        continue

                    if identifiers_match(property_identifiers, prop_identifiers):
                        return True
                else:
                    # Property type matched and no identifier check needed
                    return True

    return False

Check if agent is authorized for a property.

Args

adagents_data
Parsed adagents.json data
agent_url
URL of the sales agent to verify
property_type
Type of property (website, app, etc.) - optional
property_identifiers
List of identifiers to match - optional

Returns

True if agent is authorized, False otherwise

Raises

AdagentsValidationError
If adagents_data is malformed

Notes

  • If property_type/identifiers are None, checks if agent is authorized for ANY property on this domain
  • Implements AdCP domain matching rules
  • Agent URLs are matched ignoring protocol and trailing slash
async def verify_agent_for_property(publisher_domain: str,
agent_url: str,
property_identifiers: list[dict[str, str]],
property_type: str | None = None,
timeout: float = 10.0,
client: httpx.AsyncClient | None = None) ‑> bool
Expand source code
async def verify_agent_for_property(
    publisher_domain: str,
    agent_url: str,
    property_identifiers: list[dict[str, str]],
    property_type: str | None = None,
    timeout: float = 10.0,
    client: httpx.AsyncClient | None = None,
) -> bool:
    """Convenience wrapper to fetch adagents.json and verify authorization in one call.

    Args:
        publisher_domain: Domain hosting the adagents.json file
        agent_url: URL of the sales agent to verify
        property_identifiers: List of identifiers to match
        property_type: Type of property (website, app, etc.) - optional
        timeout: Request timeout in seconds
        client: Optional httpx.AsyncClient for connection pooling

    Returns:
        True if agent is authorized, False otherwise

    Raises:
        AdagentsNotFoundError: If adagents.json not found (404)
        AdagentsValidationError: If JSON is invalid or malformed
        AdagentsTimeoutError: If request times out
    """
    adagents_data = await fetch_adagents(publisher_domain, timeout=timeout, client=client)
    return verify_agent_authorization(
        adagents_data=adagents_data,
        agent_url=agent_url,
        property_type=property_type,
        property_identifiers=property_identifiers,
    )

Convenience wrapper to fetch adagents.json and verify authorization in one call.

Args

publisher_domain
Domain hosting the adagents.json file
agent_url
URL of the sales agent to verify
property_identifiers
List of identifiers to match
property_type
Type of property (website, app, etc.) - optional
timeout
Request timeout in seconds
client
Optional httpx.AsyncClient for connection pooling

Returns

True if agent is authorized, False otherwise

Raises

AdagentsNotFoundError
If adagents.json not found (404)
AdagentsValidationError
If JSON is invalid or malformed
AdagentsTimeoutError
If request times out

Classes

class AuthorizationContext (properties: list[dict[str, Any]])
Expand source code
class AuthorizationContext:
    """Authorization context for a publisher domain.

    Attributes:
        property_ids: List of property IDs the agent is authorized for
        property_tags: List of property tags the agent is authorized for
        raw_properties: Raw property data from adagents.json
    """

    def __init__(self, properties: list[dict[str, Any]]):
        """Initialize from list of properties.

        Args:
            properties: List of property dictionaries from adagents.json
        """
        self.property_ids: list[str] = []
        self.property_tags: list[str] = []
        self.raw_properties = properties

        # Extract property IDs and tags
        for prop in properties:
            if not isinstance(prop, dict):
                continue

            # Extract property ID (per AdCP v2 schema, the field is "property_id")
            prop_id = prop.get("property_id")
            if prop_id and isinstance(prop_id, str):
                self.property_ids.append(prop_id)

            # Extract tags
            tags = prop.get("tags", [])
            if isinstance(tags, list):
                for tag in tags:
                    if isinstance(tag, str) and tag not in self.property_tags:
                        self.property_tags.append(tag)

    def __repr__(self) -> str:
        return (
            f"AuthorizationContext("
            f"property_ids={self.property_ids}, "
            f"property_tags={self.property_tags})"
        )

Authorization context for a publisher domain.

Attributes

property_ids
List of property IDs the agent is authorized for
property_tags
List of property tags the agent is authorized for
raw_properties
Raw property data from adagents.json

Initialize from list of properties.

Args

properties
List of property dictionaries from adagents.json