Module adcp.registry_sync
Registry change feed synchronization.
Classes
class CursorStore (*args, **kwargs)-
Expand source code
@runtime_checkable class CursorStore(Protocol): """Protocol for persisting the feed cursor.""" async def load(self) -> str | None: """Load the saved cursor, or None if no cursor exists.""" ... async def save(self, cursor: str) -> None: """Save the current cursor.""" ...Protocol for persisting the feed cursor.
Ancestors
- typing.Protocol
- typing.Generic
Methods
async def load(self) ‑> str | None-
Expand source code
async def load(self) -> str | None: """Load the saved cursor, or None if no cursor exists.""" ...Load the saved cursor, or None if no cursor exists.
async def save(self, cursor: str) ‑> None-
Expand source code
async def save(self, cursor: str) -> None: """Save the current cursor.""" ...Save the current cursor.
class FileCursorStore (path: str | Path = '.adcp-sync-cursor.json')-
Expand source code
class FileCursorStore: """Default cursor store using a local JSON file. Args: path: Path to the cursor file. Defaults to .adcp-sync-cursor.json """ def __init__(self, path: str | Path = ".adcp-sync-cursor.json") -> None: self._path = Path(path) async def load(self) -> str | None: try: data = json.loads(self._path.read_text()) return data.get("cursor") # type: ignore[no-any-return] except (FileNotFoundError, json.JSONDecodeError, KeyError): return None async def save(self, cursor: str) -> None: temp = self._path.with_suffix(".tmp") temp.write_text(json.dumps({"cursor": cursor})) temp.replace(self._path) # Atomic renameDefault cursor store using a local JSON file.
Args
path- Path to the cursor file. Defaults to .adcp-sync-cursor.json
Methods
async def load(self) ‑> str | None-
Expand source code
async def load(self) -> str | None: try: data = json.loads(self._path.read_text()) return data.get("cursor") # type: ignore[no-any-return] except (FileNotFoundError, json.JSONDecodeError, KeyError): return None async def save(self, cursor: str) ‑> None-
Expand source code
async def save(self, cursor: str) -> None: temp = self._path.with_suffix(".tmp") temp.write_text(json.dumps({"cursor": cursor})) temp.replace(self._path) # Atomic rename
class RegistrySync (client: RegistryClient,
*,
auth_token: str,
poll_interval: float = 60.0,
cursor_store: CursorStore | None = None,
types: str | None = None,
batch_size: int = 100)-
Expand source code
class RegistrySync: """Polls the registry change feed and dispatches events to handlers. Args: client: RegistryClient instance for HTTP calls. auth_token: Bearer token for feed access. poll_interval: Seconds between polls (default 60). cursor_store: Optional CursorStore for persistence. Defaults to FileCursorStore. types: Optional event type filter (e.g., "property.*,agent.*"). batch_size: Max events per poll (default 100, max 10000). """ def __init__( self, client: RegistryClient, *, auth_token: str, poll_interval: float = 60.0, cursor_store: CursorStore | None = None, types: str | None = None, batch_size: int = 100, ) -> None: self._client = client self._auth_token = auth_token self._poll_interval = poll_interval self._cursor_store: CursorStore = cursor_store or FileCursorStore() self._types = types self._batch_size = min(batch_size, 10000) self._handlers: dict[str, list[ChangeHandler]] = defaultdict(list) self._all_handlers: list[ChangeHandler] = [] self._cursor: str | None = None self._cursor_loaded = False self._stop_event: asyncio.Event | None = None self._running = False def on(self, event_type: str, handler: ChangeHandler) -> None: """Register a handler for a specific event type. Supports glob patterns: "property.*" matches "property.created", "property.updated", etc. """ self._handlers[event_type].append(handler) def on_all(self, handler: ChangeHandler) -> None: """Register a handler for all events.""" self._all_handlers.append(handler) @property def cursor(self) -> str | None: """Current cursor position.""" return self._cursor async def _load_cursor(self) -> None: """Load cursor from store on first use.""" if not self._cursor_loaded: self._cursor = await self._cursor_store.load() self._cursor_loaded = True async def _dispatch(self, event: FeedEvent) -> None: """Dispatch a single event to matching handlers.""" # Dispatch to type-specific handlers for pattern, handlers in self._handlers.items(): if fnmatch(event.event_type, pattern): for handler in handlers: try: await handler(event) except Exception: logger.exception( "Handler error for event %s (%s)", event.event_id, event.event_type, ) # Dispatch to catch-all handlers for handler in self._all_handlers: try: await handler(event) except Exception: logger.exception( "Handler error for event %s (%s)", event.event_id, event.event_type, ) async def poll_once(self) -> list[FeedEvent]: """Poll the feed once and dispatch events. Returns the list of events processed. """ await self._load_cursor() try: page = await self._client.get_feed( auth_token=self._auth_token, cursor=self._cursor, types=self._types, limit=self._batch_size, ) except RegistryError as e: if e.status_code == 410: logger.warning("Feed cursor expired, resetting to start") self._cursor = None await self._cursor_store.save("") return [] raise for event in page.events: await self._dispatch(event) if page.cursor: self._cursor = page.cursor await self._cursor_store.save(page.cursor) return list(page.events) async def start(self) -> None: """Start the polling loop. Runs until stop() is called.""" if self._running: return self._running = True self._stop_event = asyncio.Event() logger.info("RegistrySync started (interval=%.1fs)", self._poll_interval) try: while not self._stop_event.is_set(): try: events = await self.poll_once() if events: logger.debug("Processed %d events", len(events)) except RegistryError as e: logger.error("Feed poll failed: %s", e) except Exception: logger.exception("Unexpected error in feed poll") # Wait for interval or stop signal try: await asyncio.wait_for( self._stop_event.wait(), timeout=self._poll_interval, ) except asyncio.TimeoutError: pass # Normal - poll interval elapsed finally: self._running = False logger.info("RegistrySync stopped") async def stop(self) -> None: """Stop the polling loop gracefully.""" if self._stop_event is not None: self._stop_event.set()Polls the registry change feed and dispatches events to handlers.
Args
client- RegistryClient instance for HTTP calls.
auth_token- Bearer token for feed access.
poll_interval- Seconds between polls (default 60).
cursor_store- Optional CursorStore for persistence. Defaults to FileCursorStore.
types- Optional event type filter (e.g., "property.,agent.").
batch_size- Max events per poll (default 100, max 10000).
Instance variables
prop cursor : str | None-
Expand source code
@property def cursor(self) -> str | None: """Current cursor position.""" return self._cursorCurrent cursor position.
Methods
def on(self, event_type: str, handler: ChangeHandler) ‑> None-
Expand source code
def on(self, event_type: str, handler: ChangeHandler) -> None: """Register a handler for a specific event type. Supports glob patterns: "property.*" matches "property.created", "property.updated", etc. """ self._handlers[event_type].append(handler)Register a handler for a specific event type.
Supports glob patterns: "property.*" matches "property.created", "property.updated", etc.
def on_all(self, handler: ChangeHandler) ‑> None-
Expand source code
def on_all(self, handler: ChangeHandler) -> None: """Register a handler for all events.""" self._all_handlers.append(handler)Register a handler for all events.
async def poll_once(self) ‑> list[FeedEvent]-
Expand source code
async def poll_once(self) -> list[FeedEvent]: """Poll the feed once and dispatch events. Returns the list of events processed. """ await self._load_cursor() try: page = await self._client.get_feed( auth_token=self._auth_token, cursor=self._cursor, types=self._types, limit=self._batch_size, ) except RegistryError as e: if e.status_code == 410: logger.warning("Feed cursor expired, resetting to start") self._cursor = None await self._cursor_store.save("") return [] raise for event in page.events: await self._dispatch(event) if page.cursor: self._cursor = page.cursor await self._cursor_store.save(page.cursor) return list(page.events)Poll the feed once and dispatch events.
Returns the list of events processed.
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: """Start the polling loop. Runs until stop() is called.""" if self._running: return self._running = True self._stop_event = asyncio.Event() logger.info("RegistrySync started (interval=%.1fs)", self._poll_interval) try: while not self._stop_event.is_set(): try: events = await self.poll_once() if events: logger.debug("Processed %d events", len(events)) except RegistryError as e: logger.error("Feed poll failed: %s", e) except Exception: logger.exception("Unexpected error in feed poll") # Wait for interval or stop signal try: await asyncio.wait_for( self._stop_event.wait(), timeout=self._poll_interval, ) except asyncio.TimeoutError: pass # Normal - poll interval elapsed finally: self._running = False logger.info("RegistrySync stopped")Start the polling loop. Runs until stop() is called.
async def stop(self) ‑> None-
Expand source code
async def stop(self) -> None: """Stop the polling loop gracefully.""" if self._stop_event is not None: self._stop_event.set()Stop the polling loop gracefully.