"""Market data API adapter interface and concrete Polygon.io provider. The MarketDataAdapter is the abstract interface for all market data providers. PolygonMarketAdapter is the first concrete implementation, targeting the Polygon.io REST API for previous-day bars, quotes, and ticker details. Requirements: 2.1, 2.5, 3.1, 3.2, 3.3 """ import hashlib import logging import time from datetime import datetime, timezone from typing import Any import httpx from .base import AdapterResult, BaseAdapter logger = logging.getLogger("market_adapter") class MarketDataAdapter(BaseAdapter): """Abstract interface for market data providers. Subclasses implement fetch() for their specific market data API. """ def source_type(self) -> str: return "market_api" class PolygonMarketAdapter(MarketDataAdapter): """Concrete adapter for the Polygon.io REST API. Supports: - Previous-day aggregate bars (/v2/aggs/ticker/{ticker}/prev) - Grouped daily bars (/v2/aggs/grouped/locale/us/market/stocks/{date}) - Ticker details (/v3/reference/tickers/{ticker}) The endpoint is selected via the source config's "endpoint" field, defaulting to previous-day bars. """ PREV_BARS = "/v2/aggs/ticker/{ticker}/prev" RANGE_BARS = "/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}" TICKER_DETAILS = "/v3/reference/tickers/{ticker}" def __init__(self, api_key: str, base_url: str = "https://api.polygon.io") -> None: self.api_key: str = api_key self.base_url: str = base_url.rstrip("/") async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult: """Fetch market data from Polygon.io for a given ticker. Config options: endpoint: One of "prev_bars" (default), "range_bars", "ticker_details" multiplier: Bar multiplier for range queries (default 1) timespan: Bar timespan for range queries (default "day") from_date: Start date for range queries (YYYY-MM-DD) to_date: End date for range queries (YYYY-MM-DD) adjusted: Whether bars are adjusted for splits (default true) """ endpoint_key = config.get("endpoint", "prev_bars") url, params = self._build_request(ticker, endpoint_key, config) async with httpx.AsyncClient(timeout=30) as client: t0 = time.monotonic() try: resp = await client.get(url, params=params) elapsed_ms = (time.monotonic() - t0) * 1000 resp.raise_for_status() raw = resp.content data = resp.json() content_hash = hashlib.sha256(raw).hexdigest() items = self._extract_items(data, endpoint_key) return AdapterResult( source_type="market_api", ticker=ticker, items=items, raw_payload=raw, content_hash=content_hash, fetched_at=datetime.now(timezone.utc), http_status=resp.status_code, response_time_ms=round(elapsed_ms, 1), metadata={ "provider": "polygon", "endpoint": endpoint_key, "results_count": data.get("resultsCount", len(items)), "request_id": data.get("request_id", ""), }, ) except httpx.HTTPStatusError as e: elapsed_ms = (time.monotonic() - t0) * 1000 logger.error("Polygon HTTP error for %s: %s", ticker, e) return self._error_result( ticker, str(e), elapsed_ms, http_status=e.response.status_code if e.response else None, raw=e.response.content if e.response else b"", ) except httpx.TimeoutException as e: elapsed_ms = (time.monotonic() - t0) * 1000 logger.error("Polygon timeout for %s: %s", ticker, e) return self._error_result(ticker, f"timeout: {e}", elapsed_ms) except Exception as e: elapsed_ms = (time.monotonic() - t0) * 1000 logger.error("Polygon fetch failed for %s: %s", ticker, e) return self._error_result(ticker, str(e), elapsed_ms) def _build_request( self, ticker: str, endpoint_key: str, config: dict[str, Any] ) -> tuple[str, dict[str, str]]: """Build the URL and query params for a Polygon request.""" params: dict[str, str] = {"apiKey": self.api_key} if endpoint_key == "range_bars": multiplier = str(config.get("multiplier", 1)) timespan = config.get("timespan", "day") from_date = config.get("from_date", "") to_date = config.get("to_date", "") path = self.RANGE_BARS.format( ticker=ticker, multiplier=multiplier, timespan=timespan, from_date=from_date, to_date=to_date, ) if config.get("adjusted") is not None: params["adjusted"] = str(config["adjusted"]).lower() if config.get("sort"): params["sort"] = config["sort"] if config.get("limit"): params["limit"] = str(config["limit"]) elif endpoint_key == "ticker_details": path = self.TICKER_DETAILS.format(ticker=ticker) else: # Default: previous-day bars path = self.PREV_BARS.format(ticker=ticker) if config.get("adjusted") is not None: params["adjusted"] = str(config["adjusted"]).lower() return f"{self.base_url}{path}", params def _extract_items(self, data: dict[str, Any], endpoint_key: str) -> list[dict[str, Any]]: """Extract the relevant items list from a Polygon response.""" if endpoint_key == "ticker_details": results = data.get("results", {}) return [results] if isinstance(results, dict) and results else [] # Aggregate endpoints return results as a list results = data.get("results", []) if isinstance(results, list): return results return [results] if results else [] def _error_result( self, ticker: str, error: str, elapsed_ms: float, http_status: int | None = None, raw: bytes = b"", ) -> AdapterResult: """Build an error AdapterResult.""" return AdapterResult( source_type="market_api", ticker=ticker, items=[], raw_payload=raw, content_hash="", fetched_at=datetime.now(timezone.utc), error=error, http_status=http_status, response_time_ms=round(elapsed_ms, 1), metadata={"provider": "polygon"}, )