Files

171 lines
6.2 KiB
Python

"""News API adapter interface and concrete Polygon.io news provider.
The NewsDataAdapter is the abstract interface for all news data providers.
PolygonNewsAdapter is the first concrete implementation, targeting the
Polygon.io REST API for company-linked news articles and headlines.
Requirements: 2.2, 2.5, 3.1, 3.2, 3.3
"""
import hashlib
import logging
import time
from abc import ABC
from datetime import datetime, timezone
from typing import Any
import httpx
from .base import AdapterResult, BaseAdapter
logger = logging.getLogger("news_adapter")
class NewsDataAdapter(BaseAdapter, ABC):
"""Abstract interface for news data providers.
Subclasses implement fetch() for their specific news API.
source_type() is concrete here since all news adapters share the same type.
"""
def source_type(self) -> str:
return "news_api"
class PolygonNewsAdapter(NewsDataAdapter):
"""Concrete adapter for the Polygon.io ticker news endpoint.
Supports:
- Ticker news (/v2/reference/news?ticker={ticker})
Config options:
limit: Max articles to return per request (default 20, max 1000)
published_utc_gte: Only articles published on or after this date (YYYY-MM-DD)
published_utc_lte: Only articles published on or before this date (YYYY-MM-DD)
order: Sort order for results, "asc" or "desc" (default "desc")
"""
NEWS_ENDPOINT = "/v2/reference/news"
def __init__(self, api_key: str, base_url: str = "https://api.polygon.io") -> None:
self.api_key: str = api_key
self.base_url: str = base_url.rstrip("/")
async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult:
"""Fetch news articles from Polygon.io for a given ticker.
Args:
ticker: The company ticker symbol.
config: Source-specific configuration from the sources table.
Returns:
AdapterResult with raw payload, parsed article items, and metadata.
"""
url, params = self._build_request(ticker, config)
async with httpx.AsyncClient(timeout=30) as client:
t0 = time.monotonic()
try:
resp = await client.get(url, params=params)
elapsed_ms = (time.monotonic() - t0) * 1000
resp.raise_for_status()
raw = resp.content
data = resp.json()
content_hash = hashlib.sha256(raw).hexdigest()
items = self._extract_items(data)
return AdapterResult(
source_type="news_api",
ticker=ticker,
items=items,
raw_payload=raw,
content_hash=content_hash,
fetched_at=datetime.now(timezone.utc),
http_status=resp.status_code,
response_time_ms=round(elapsed_ms, 1),
metadata={
"provider": "polygon",
"results_count": data.get("count", len(items)),
"next_url": data.get("next_url", ""),
"request_id": data.get("request_id", ""),
},
)
except httpx.HTTPStatusError as e:
elapsed_ms = (time.monotonic() - t0) * 1000
logger.error("Polygon news HTTP error for %s: %s", ticker, e)
return self._error_result(
ticker, str(e), elapsed_ms,
http_status=e.response.status_code if e.response else None,
raw=e.response.content if e.response else b"",
)
except httpx.TimeoutException as e:
elapsed_ms = (time.monotonic() - t0) * 1000
logger.error("Polygon news timeout for %s: %s", ticker, e)
return self._error_result(ticker, f"timeout: {e}", elapsed_ms)
except Exception as e:
elapsed_ms = (time.monotonic() - t0) * 1000
logger.error("Polygon news fetch failed for %s: %s", ticker, e)
return self._error_result(ticker, str(e), elapsed_ms)
def _build_request(
self, ticker: str, config: dict[str, Any]
) -> tuple[str, dict[str, str]]:
"""Build the URL and query params for a Polygon news request."""
params: dict[str, str] = {
"apiKey": self.api_key,
"ticker": ticker,
}
limit = config.get("limit", 20)
params["limit"] = str(min(int(limit), 1000))
if config.get("order"):
params["order"] = config["order"]
if config.get("published_utc_gte"):
params["published_utc.gte"] = config["published_utc_gte"]
if config.get("published_utc_lte"):
params["published_utc.lte"] = config["published_utc_lte"]
# Auto-filter to only fetch articles newer than last successful fetch
if config.get("last_published_at") and "published_utc.gt" not in params:
params["published_utc.gt"] = config["last_published_at"]
url = f"{self.base_url}{self.NEWS_ENDPOINT}"
return url, params
def _extract_items(self, data: dict[str, Any]) -> list[dict[str, Any]]:
"""Extract the article list from a Polygon news response.
Polygon returns articles under the "results" key as a list of objects,
each containing fields like id, publisher, title, article_url, tickers,
published_utc, description, and keywords.
"""
results = data.get("results", [])
if isinstance(results, list):
return results
return []
def _error_result(
self,
ticker: str,
error: str,
elapsed_ms: float,
http_status: int | None = None,
raw: bytes = b"",
) -> AdapterResult:
"""Build an error AdapterResult for news fetches."""
return AdapterResult(
source_type="news_api",
ticker=ticker,
items=[],
raw_payload=raw,
content_hash="",
fetched_at=datetime.now(timezone.utc),
error=error,
http_status=http_status,
response_time_ms=round(elapsed_ms, 1),
metadata={"provider": "polygon"},
)