Module adcp.adagents

Functions

def domain_matches(property_domain: str, agent_domain_pattern: str) ‑> bool
Expand source code
def domain_matches(property_domain: str, agent_domain_pattern: str) -> bool:
    """Check if domains match per AdCP rules.

    Rules:
    - Exact match always succeeds
    - 'example.com' matches www.example.com, m.example.com (common subdomains)
    - 'subdomain.example.com' matches that specific subdomain only
    - '*.example.com' matches all subdomains

    Args:
        property_domain: Domain from property
        agent_domain_pattern: Domain pattern from adagents.json

    Returns:
        True if domains match per AdCP rules
    """
    # Normalize both domains for comparison
    try:
        property_domain = _normalize_domain(property_domain)
        agent_domain_pattern = _normalize_domain(agent_domain_pattern)
    except AdagentsValidationError:
        # Invalid domain format - no match
        return False

    # Exact match
    if property_domain == agent_domain_pattern:
        return True

    # Wildcard pattern (*.example.com)
    if agent_domain_pattern.startswith("*."):
        base_domain = agent_domain_pattern[2:]
        return property_domain.endswith(f".{base_domain}")

    # Bare domain matches common subdomains (www, m)
    # If agent pattern is a bare domain (no subdomain), match www/m subdomains
    if "." in agent_domain_pattern and not agent_domain_pattern.startswith("www."):
        # Check if this looks like a bare domain (e.g., example.com)
        parts = agent_domain_pattern.split(".")
        if len(parts) == 2:  # Looks like bare domain
            common_subdomains = ["www", "m"]
            for subdomain in common_subdomains:
                if property_domain == f"{subdomain}.{agent_domain_pattern}":
                    return True

    return False

Check if domains match per AdCP rules.

Rules: - Exact match always succeeds - 'example.com' matches www.example.com, m.example.com (common subdomains) - 'subdomain.example.com' matches that specific subdomain only - '*.example.com' matches all subdomains

Args

property_domain
Domain from property
agent_domain_pattern
Domain pattern from adagents.json

Returns

True if domains match per AdCP rules

async def fetch_adagents(publisher_domain: str,
timeout: float = 10.0,
user_agent: str = 'AdCP-Client/1.0',
client: httpx.AsyncClient | None = None) ‑> dict[str, typing.Any]
Expand source code
async def fetch_adagents(
    publisher_domain: str,
    timeout: float = 10.0,
    user_agent: str = "AdCP-Client/1.0",
    client: httpx.AsyncClient | None = None,
) -> dict[str, Any]:
    """Fetch and parse adagents.json from publisher domain.

    Args:
        publisher_domain: Domain hosting the adagents.json file
        timeout: Request timeout in seconds
        user_agent: User-Agent header for HTTP request
        client: Optional httpx.AsyncClient for connection pooling.
            If provided, caller is responsible for client lifecycle.
            If None, a new client is created for this request.

    Returns:
        Parsed adagents.json data

    Raises:
        AdagentsNotFoundError: If adagents.json not found (404)
        AdagentsValidationError: If JSON is invalid or malformed
        AdagentsTimeoutError: If request times out

    Notes:
        For production use with multiple requests, pass a shared httpx.AsyncClient
        to enable connection pooling and improve performance.
    """
    # Validate and normalize domain for security
    publisher_domain = _validate_publisher_domain(publisher_domain)

    # Construct URL
    url = f"https://{publisher_domain}/.well-known/adagents.json"

    try:
        # Use provided client or create a new one
        if client is not None:
            # Reuse provided client (connection pooling)
            response = await client.get(
                url,
                headers={"User-Agent": user_agent},
                timeout=timeout,
                follow_redirects=True,
            )
        else:
            # Create new client for single request
            async with httpx.AsyncClient() as new_client:
                response = await new_client.get(
                    url,
                    headers={"User-Agent": user_agent},
                    timeout=timeout,
                    follow_redirects=True,
                )

        # Process response (same for both paths)
        if response.status_code == 404:
            raise AdagentsNotFoundError(publisher_domain)

        if response.status_code != 200:
            raise AdagentsValidationError(
                f"Failed to fetch adagents.json: HTTP {response.status_code}"
            )

        # Parse JSON
        try:
            data = response.json()
        except Exception as e:
            raise AdagentsValidationError(f"Invalid JSON in adagents.json: {e}") from e

        # Validate basic structure
        if not isinstance(data, dict):
            raise AdagentsValidationError("adagents.json must be a JSON object")

        if "authorized_agents" not in data:
            raise AdagentsValidationError("adagents.json must have 'authorized_agents' field")

        if not isinstance(data["authorized_agents"], list):
            raise AdagentsValidationError("'authorized_agents' must be an array")

        # Validate mutual exclusivity constraints
        try:
            validate_adagents(data)
        except ValidationError as e:
            raise AdagentsValidationError(f"Invalid adagents.json structure: {e}") from e

        return data

    except httpx.TimeoutException as e:
        raise AdagentsTimeoutError(publisher_domain, timeout) from e
    except httpx.RequestError as e:
        raise AdagentsValidationError(f"Failed to fetch adagents.json: {e}") from e

Fetch and parse adagents.json from publisher domain.

Args

publisher_domain
Domain hosting the adagents.json file
timeout
Request timeout in seconds
user_agent
User-Agent header for HTTP request
client
Optional httpx.AsyncClient for connection pooling. If provided, caller is responsible for client lifecycle. If None, a new client is created for this request.

Returns

Parsed adagents.json data

Raises

AdagentsNotFoundError
If adagents.json not found (404)
AdagentsValidationError
If JSON is invalid or malformed
AdagentsTimeoutError
If request times out

Notes

For production use with multiple requests, pass a shared httpx.AsyncClient to enable connection pooling and improve performance.

async def fetch_agent_authorizations(agent_url: str,
publisher_domains: list[str],
timeout: float = 10.0,
client: httpx.AsyncClient | None = None) ‑> dict[str, AuthorizationContext]
Expand source code
async def fetch_agent_authorizations(
    agent_url: str,
    publisher_domains: list[str],
    timeout: float = 10.0,
    client: httpx.AsyncClient | None = None,
) -> dict[str, AuthorizationContext]:
    """Fetch authorization contexts by checking publisher adagents.json files.

    This function discovers what publishers have authorized your agent by fetching
    their adagents.json files from the .well-known directory and extracting the
    properties your agent can access.

    This is the "pull" approach - you query publishers to see if they've authorized you.
    For the "push" approach where the agent tells you what it's authorized for,
    use the agent's list_authorized_properties endpoint via ADCPClient.

    Args:
        agent_url: URL of your sales agent
        publisher_domains: List of publisher domains to check (e.g., ["nytimes.com", "wsj.com"])
        timeout: Request timeout in seconds for each fetch
        client: Optional httpx.AsyncClient for connection pooling

    Returns:
        Dictionary mapping publisher domain to AuthorizationContext.
        Only includes domains where the agent is authorized.

    Example:
        >>> # "Pull" approach - check what publishers have authorized you
        >>> contexts = await fetch_agent_authorizations(
        ...     "https://our-sales-agent.com",
        ...     ["nytimes.com", "wsj.com", "cnn.com"]
        ... )
        >>> for domain, ctx in contexts.items():
        ...     print(f"{domain}:")
        ...     print(f"  Property IDs: {ctx.property_ids}")
        ...     print(f"  Tags: {ctx.property_tags}")

    See Also:
        ADCPClient.list_authorized_properties: "Push" approach using the agent's API

    Notes:
        - Silently skips domains where adagents.json is not found or invalid
        - Only returns domains where the agent is explicitly authorized
        - For production use with many domains, pass a shared httpx.AsyncClient
          to enable connection pooling
    """
    import asyncio

    # Create tasks to fetch all adagents.json files in parallel
    async def fetch_authorization_for_domain(
        domain: str,
    ) -> tuple[str, AuthorizationContext | None]:
        """Fetch authorization context for a single domain."""
        try:
            adagents_data = await fetch_adagents(domain, timeout=timeout, client=client)

            # Check if agent is authorized
            if not verify_agent_authorization(adagents_data, agent_url):
                return (domain, None)

            # Get properties for this agent
            properties = get_properties_by_agent(adagents_data, agent_url)

            # Create authorization context
            return (domain, AuthorizationContext(properties))

        except (AdagentsNotFoundError, AdagentsValidationError, AdagentsTimeoutError):
            # Silently skip domains with missing or invalid adagents.json
            return (domain, None)

    # Fetch all domains in parallel
    tasks = [fetch_authorization_for_domain(domain) for domain in publisher_domains]
    results = await asyncio.gather(*tasks)

    # Build result dictionary, filtering out None values
    return {domain: ctx for domain, ctx in results if ctx is not None}

Fetch authorization contexts by checking publisher adagents.json files.

This function discovers what publishers have authorized your agent by fetching their adagents.json files from the .well-known directory and extracting the properties your agent can access.

This is the "pull" approach - you query publishers to see if they've authorized you. For the "push" approach where the agent tells you what it's authorized for, use the agent's list_authorized_properties endpoint via ADCPClient.

Args

agent_url
URL of your sales agent
publisher_domains
List of publisher domains to check (e.g., ["nytimes.com", "wsj.com"])
timeout
Request timeout in seconds for each fetch
client
Optional httpx.AsyncClient for connection pooling

Returns

Dictionary mapping publisher domain to AuthorizationContext. Only includes domains where the agent is authorized.

Example

>>> # "Pull" approach - check what publishers have authorized you
>>> contexts = await fetch_agent_authorizations(
...     "https://our-sales-agent.com",
...     ["nytimes.com", "wsj.com", "cnn.com"]
... )
>>> for domain, ctx in contexts.items():
...     print(f"{domain}:")
...     print(f"  Property IDs: {ctx.property_ids}")
...     print(f"  Tags: {ctx.property_tags}")

See Also: ADCPClient.list_authorized_properties: "Push" approach using the agent's API

Notes

  • Silently skips domains where adagents.json is not found or invalid
  • Only returns domains where the agent is explicitly authorized
  • For production use with many domains, pass a shared httpx.AsyncClient to enable connection pooling
def get_all_properties(adagents_data: dict[str, Any]) ‑> list[dict[str, typing.Any]]
Expand source code
def get_all_properties(adagents_data: dict[str, Any]) -> list[dict[str, Any]]:
    """Extract all properties from adagents.json data.

    Args:
        adagents_data: Parsed adagents.json data

    Returns:
        List of all properties across all authorized agents, with agent_url added

    Raises:
        AdagentsValidationError: If adagents_data is malformed
    """
    if not isinstance(adagents_data, dict):
        raise AdagentsValidationError("adagents_data must be a dictionary")

    authorized_agents = adagents_data.get("authorized_agents")
    if not isinstance(authorized_agents, list):
        raise AdagentsValidationError("adagents.json must have 'authorized_agents' array")

    properties = []
    for agent in authorized_agents:
        if not isinstance(agent, dict):
            continue

        agent_url = agent.get("url", "")
        if not agent_url:
            continue

        agent_properties = agent.get("properties", [])
        if not isinstance(agent_properties, list):
            continue

        # Add each property with the agent URL for reference
        for prop in agent_properties:
            if isinstance(prop, dict):
                # Create a copy and add agent_url
                prop_with_agent = {**prop, "agent_url": agent_url}
                properties.append(prop_with_agent)

    return properties

Extract all properties from adagents.json data.

Args

adagents_data
Parsed adagents.json data

Returns

List of all properties across all authorized agents, with agent_url added

Raises

AdagentsValidationError
If adagents_data is malformed
def get_all_tags(adagents_data: dict[str, Any]) ‑> set[str]
Expand source code
def get_all_tags(adagents_data: dict[str, Any]) -> set[str]:
    """Extract all unique tags from properties in adagents.json data.

    Args:
        adagents_data: Parsed adagents.json data

    Returns:
        Set of all unique tags across all properties

    Raises:
        AdagentsValidationError: If adagents_data is malformed
    """
    properties = get_all_properties(adagents_data)
    tags = set()

    for prop in properties:
        prop_tags = prop.get("tags", [])
        if isinstance(prop_tags, list):
            for tag in prop_tags:
                if isinstance(tag, str):
                    tags.add(tag)

    return tags

Extract all unique tags from properties in adagents.json data.

Args

adagents_data
Parsed adagents.json data

Returns

Set of all unique tags across all properties

Raises

AdagentsValidationError
If adagents_data is malformed
def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) ‑> list[dict[str, typing.Any]]
Expand source code
def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) -> list[dict[str, Any]]:
    """Get all properties authorized for a specific agent.

    Args:
        adagents_data: Parsed adagents.json data
        agent_url: URL of the agent to filter by

    Returns:
        List of properties for the specified agent (empty if agent not found or no properties)

    Raises:
        AdagentsValidationError: If adagents_data is malformed
    """
    if not isinstance(adagents_data, dict):
        raise AdagentsValidationError("adagents_data must be a dictionary")

    authorized_agents = adagents_data.get("authorized_agents")
    if not isinstance(authorized_agents, list):
        raise AdagentsValidationError("adagents.json must have 'authorized_agents' array")

    # Normalize the agent URL for comparison
    normalized_agent_url = normalize_url(agent_url)

    for agent in authorized_agents:
        if not isinstance(agent, dict):
            continue

        agent_url_from_json = agent.get("url", "")
        if not agent_url_from_json:
            continue

        # Match agent URL (protocol-agnostic)
        if normalize_url(agent_url_from_json) != normalized_agent_url:
            continue

        # Found the agent - return their properties
        properties = agent.get("properties", [])
        if not isinstance(properties, list):
            return []

        return [p for p in properties if isinstance(p, dict)]

    return []

Get all properties authorized for a specific agent.

Args

adagents_data
Parsed adagents.json data
agent_url
URL of the agent to filter by

Returns

List of properties for the specified agent (empty if agent not found or no properties)

Raises

AdagentsValidationError
If adagents_data is malformed
def identifiers_match(property_identifiers: list[dict[str, str]],
agent_identifiers: list[dict[str, str]]) ‑> bool
Expand source code
def identifiers_match(
    property_identifiers: list[dict[str, str]],
    agent_identifiers: list[dict[str, str]],
) -> bool:
    """Check if any property identifier matches agent's authorized identifiers.

    Args:
        property_identifiers: Identifiers from property
            (e.g., [{"type": "domain", "value": "cnn.com"}])
        agent_identifiers: Identifiers from adagents.json

    Returns:
        True if any identifier matches

    Notes:
        - Domain identifiers use AdCP domain matching rules
        - Other identifiers (bundle_id, roku_store_id, etc.) require exact match
    """
    for prop_id in property_identifiers:
        prop_type = prop_id.get("type", "")
        prop_value = prop_id.get("value", "")

        for agent_id in agent_identifiers:
            agent_type = agent_id.get("type", "")
            agent_value = agent_id.get("value", "")

            # Type must match
            if prop_type != agent_type:
                continue

            # Domain identifiers use special matching rules
            if prop_type == "domain":
                if domain_matches(prop_value, agent_value):
                    return True
            else:
                # Other identifier types require exact match
                if prop_value == agent_value:
                    return True

    return False

Check if any property identifier matches agent's authorized identifiers.

Args

property_identifiers
Identifiers from property (e.g., [{"type": "domain", "value": "cnn.com"}])
agent_identifiers
Identifiers from adagents.json

Returns

True if any identifier matches

Notes

  • Domain identifiers use AdCP domain matching rules
  • Other identifiers (bundle_id, roku_store_id, etc.) require exact match
def normalize_url(url: str) ‑> str
Expand source code
def normalize_url(url: str) -> str:
    """Normalize URL by removing protocol and trailing slash.

    Args:
        url: URL to normalize

    Returns:
        Normalized URL (domain/path without protocol or trailing slash)
    """
    parsed = urlparse(url)
    normalized = parsed.netloc + parsed.path
    return normalized.rstrip("/")

Normalize URL by removing protocol and trailing slash.

Args

url
URL to normalize

Returns

Normalized URL (domain/path without protocol or trailing slash)

def verify_agent_authorization(adagents_data: dict[str, Any],
agent_url: str,
property_type: str | None = None,
property_identifiers: list[dict[str, str]] | None = None) ‑> bool
Expand source code
def verify_agent_authorization(
    adagents_data: dict[str, Any],
    agent_url: str,
    property_type: str | None = None,
    property_identifiers: list[dict[str, str]] | None = None,
) -> bool:
    """Check if agent is authorized for a property.

    Args:
        adagents_data: Parsed adagents.json data
        agent_url: URL of the sales agent to verify
        property_type: Type of property (website, app, etc.) - optional
        property_identifiers: List of identifiers to match - optional

    Returns:
        True if agent is authorized, False otherwise

    Raises:
        AdagentsValidationError: If adagents_data is malformed

    Notes:
        - If property_type/identifiers are None, checks if agent is authorized
          for ANY property on this domain
        - Implements AdCP domain matching rules
        - Agent URLs are matched ignoring protocol and trailing slash
    """
    # Validate structure
    if not isinstance(adagents_data, dict):
        raise AdagentsValidationError("adagents_data must be a dictionary")

    authorized_agents = adagents_data.get("authorized_agents")
    if not isinstance(authorized_agents, list):
        raise AdagentsValidationError("adagents.json must have 'authorized_agents' array")

    # Normalize the agent URL for comparison
    normalized_agent_url = normalize_url(agent_url)

    # Check each authorized agent
    for agent in authorized_agents:
        if not isinstance(agent, dict):
            continue

        agent_url_from_json = agent.get("url", "")
        if not agent_url_from_json:
            continue

        # Match agent URL (protocol-agnostic)
        if normalize_url(agent_url_from_json) != normalized_agent_url:
            continue

        # Found matching agent - now check properties
        properties = agent.get("properties")

        # If properties field is missing or empty, agent is authorized for all properties
        if properties is None or (isinstance(properties, list) and len(properties) == 0):
            return True

        # If no property filters specified, we found the agent - authorized
        if property_type is None and property_identifiers is None:
            return True

        # Check specific property authorization
        if isinstance(properties, list):
            for prop in properties:
                if not isinstance(prop, dict):
                    continue

                # Check property type if specified
                if property_type is not None:
                    prop_type = prop.get("property_type", "")
                    if prop_type != property_type:
                        continue

                # Check identifiers if specified
                if property_identifiers is not None:
                    prop_identifiers = prop.get("identifiers", [])
                    if not isinstance(prop_identifiers, list):
                        continue

                    if identifiers_match(property_identifiers, prop_identifiers):
                        return True
                else:
                    # Property type matched and no identifier check needed
                    return True

    return False

Check if agent is authorized for a property.

Args

adagents_data
Parsed adagents.json data
agent_url
URL of the sales agent to verify
property_type
Type of property (website, app, etc.) - optional
property_identifiers
List of identifiers to match - optional

Returns

True if agent is authorized, False otherwise

Raises

AdagentsValidationError
If adagents_data is malformed

Notes

  • If property_type/identifiers are None, checks if agent is authorized for ANY property on this domain
  • Implements AdCP domain matching rules
  • Agent URLs are matched ignoring protocol and trailing slash
async def verify_agent_for_property(publisher_domain: str,
agent_url: str,
property_identifiers: list[dict[str, str]],
property_type: str | None = None,
timeout: float = 10.0,
client: httpx.AsyncClient | None = None) ‑> bool
Expand source code
async def verify_agent_for_property(
    publisher_domain: str,
    agent_url: str,
    property_identifiers: list[dict[str, str]],
    property_type: str | None = None,
    timeout: float = 10.0,
    client: httpx.AsyncClient | None = None,
) -> bool:
    """Convenience wrapper to fetch adagents.json and verify authorization in one call.

    Args:
        publisher_domain: Domain hosting the adagents.json file
        agent_url: URL of the sales agent to verify
        property_identifiers: List of identifiers to match
        property_type: Type of property (website, app, etc.) - optional
        timeout: Request timeout in seconds
        client: Optional httpx.AsyncClient for connection pooling

    Returns:
        True if agent is authorized, False otherwise

    Raises:
        AdagentsNotFoundError: If adagents.json not found (404)
        AdagentsValidationError: If JSON is invalid or malformed
        AdagentsTimeoutError: If request times out
    """
    adagents_data = await fetch_adagents(publisher_domain, timeout=timeout, client=client)
    return verify_agent_authorization(
        adagents_data=adagents_data,
        agent_url=agent_url,
        property_type=property_type,
        property_identifiers=property_identifiers,
    )

Convenience wrapper to fetch adagents.json and verify authorization in one call.

Args

publisher_domain
Domain hosting the adagents.json file
agent_url
URL of the sales agent to verify
property_identifiers
List of identifiers to match
property_type
Type of property (website, app, etc.) - optional
timeout
Request timeout in seconds
client
Optional httpx.AsyncClient for connection pooling

Returns

True if agent is authorized, False otherwise

Raises

AdagentsNotFoundError
If adagents.json not found (404)
AdagentsValidationError
If JSON is invalid or malformed
AdagentsTimeoutError
If request times out

Classes

class AuthorizationContext (properties: list[dict[str, Any]])
Expand source code
class AuthorizationContext:
    """Authorization context for a publisher domain.

    Attributes:
        property_ids: List of property IDs the agent is authorized for
        property_tags: List of property tags the agent is authorized for
        raw_properties: Raw property data from adagents.json
    """

    def __init__(self, properties: list[dict[str, Any]]):
        """Initialize from list of properties.

        Args:
            properties: List of property dictionaries from adagents.json
        """
        self.property_ids: list[str] = []
        self.property_tags: list[str] = []
        self.raw_properties = properties

        # Extract property IDs and tags
        for prop in properties:
            if not isinstance(prop, dict):
                continue

            # Extract property ID
            prop_id = prop.get("id")
            if prop_id and isinstance(prop_id, str):
                self.property_ids.append(prop_id)

            # Extract tags
            tags = prop.get("tags", [])
            if isinstance(tags, list):
                for tag in tags:
                    if isinstance(tag, str) and tag not in self.property_tags:
                        self.property_tags.append(tag)

    def __repr__(self) -> str:
        return (
            f"AuthorizationContext("
            f"property_ids={self.property_ids}, "
            f"property_tags={self.property_tags})"
        )

Authorization context for a publisher domain.

Attributes

property_ids
List of property IDs the agent is authorized for
property_tags
List of property tags the agent is authorized for
raw_properties
Raw property data from adagents.json

Initialize from list of properties.

Args

properties
List of property dictionaries from adagents.json