Files
stonks-oracle/services/adapters/filings_adapter.py
T

253 lines
9.3 KiB
Python

"""Filings / Regulatory API adapter interface and concrete SEC EDGAR provider.
The FilingsDataAdapter is the abstract interface for all filings data providers.
SECEdgarAdapter is the first concrete implementation, targeting the SEC EDGAR
full-text search system (EFTS) for company filings discovery.
Requirements: 2.3, 2.5, 3.1, 3.2, 3.3
"""
import hashlib
import logging
import time
from abc import ABC
from datetime import datetime, timezone
from typing import Any
import httpx
from .base import AdapterResult, BaseAdapter
logger = logging.getLogger("filings_adapter")
class FilingsDataAdapter(BaseAdapter, ABC):
"""Abstract interface for filings / regulatory data providers.
Subclasses implement fetch() for their specific filings API.
source_type() is concrete here since all filings adapters share the same type.
"""
def source_type(self) -> str:
return "filings_api"
class SECEdgarAdapter(FilingsDataAdapter):
"""Concrete adapter for the SEC EDGAR full-text search system (EFTS).
Supports:
- Full-text search (/LATEST/search-index) for 8-K, 10-Q, 10-K, and other forms
- Filtering by date range, form type, and entity
The SEC EDGAR EFTS API is public and does not require an API key,
but requires a descriptive User-Agent header per SEC fair-access policy.
Config options:
cik: Company CIK number (optional, narrows search)
forms: Comma-separated form types to search (default "8-K,10-Q,10-K")
start_date: Only filings on or after this date, YYYY-MM-DD (optional)
end_date: Only filings on or before this date, YYYY-MM-DD (optional)
query: Custom search query override (optional, replaces ticker-based query)
"""
SEARCH_ENDPOINT: str = "/LATEST/search-index"
def __init__(
self,
base_url: str = "https://efts.sec.gov",
user_agent: str = "StonksOracle/1.0 ([email])",
) -> None:
self.base_url: str = base_url.rstrip("/")
self.user_agent: str = user_agent
async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult:
"""Fetch filings from SEC EDGAR EFTS for a given ticker.
Args:
ticker: The company ticker symbol.
config: Source-specific configuration from the sources table.
Returns:
AdapterResult with raw payload, parsed filing items, and metadata.
"""
url, params, headers = self._build_request(ticker, config)
async with httpx.AsyncClient(timeout=30) as client:
t0 = time.monotonic()
try:
resp = await client.get(url, params=params, headers=headers)
elapsed_ms = (time.monotonic() - t0) * 1000
resp.raise_for_status()
raw = resp.content
data = resp.json()
content_hash = hashlib.sha256(raw).hexdigest()
items = self._extract_items(data)
return AdapterResult(
source_type="filings_api",
ticker=ticker,
items=items,
raw_payload=raw,
content_hash=content_hash,
fetched_at=datetime.now(timezone.utc),
http_status=resp.status_code,
response_time_ms=round(elapsed_ms, 1),
metadata={
"provider": "sec_edgar",
"results_count": len(items),
"total_hits": self._total_hits(data),
"query": params.get("q", ""),
"forms": params.get("forms", ""),
},
)
except httpx.HTTPStatusError as e:
elapsed_ms = (time.monotonic() - t0) * 1000
logger.error("SEC EDGAR HTTP error for %s: %s", ticker, e)
return self._error_result(
ticker, str(e), elapsed_ms,
http_status=e.response.status_code if e.response else None,
raw=e.response.content if e.response else b"",
)
except httpx.TimeoutException as e:
elapsed_ms = (time.monotonic() - t0) * 1000
logger.error("SEC EDGAR timeout for %s: %s", ticker, e)
return self._error_result(ticker, f"timeout: {e}", elapsed_ms)
except Exception as e:
elapsed_ms = (time.monotonic() - t0) * 1000
logger.error("SEC EDGAR fetch failed for %s: %s", ticker, e)
return self._error_result(ticker, str(e), elapsed_ms)
def _build_request(
self, ticker: str, config: dict[str, Any]
) -> tuple[str, dict[str, str], dict[str, str]]:
"""Build the URL, query params, and headers for an EDGAR EFTS request."""
params: dict[str, str] = {}
headers: dict[str, str] = {"User-Agent": self.user_agent}
# Query: use custom override or default to ticker-based search
query = config.get("query")
if query:
params["q"] = str(query)
else:
params["q"] = f'"{ticker}"'
# Form types filter
forms = config.get("forms", "8-K,10-Q,10-K")
params["forms"] = str(forms)
# Date range
if config.get("start_date"):
params["dateRange"] = "custom"
params["startdt"] = str(config["start_date"])
if config.get("end_date"):
params["dateRange"] = "custom"
params["enddt"] = str(config["end_date"])
# CIK filter (entity-level narrowing)
cik = config.get("cik")
if cik:
params["q"] = f'{params["q"]} AND cik:{cik}'
url = f"{self.base_url}{self.SEARCH_ENDPOINT}"
return url, params, headers
def _extract_items(self, data: dict[str, Any]) -> list[dict[str, Any]]:
"""Extract filing hits from EDGAR EFTS, enrich with fetchable URLs.
EFTS returns results under hits.hits. Each hit has _source with
adsh, ciks, form, file_type, file_description, and file_date.
We construct the SEC EDGAR document URL from these fields and
filter to primary filing documents (not XML fragments or exhibits).
"""
hits_wrapper = data.get("hits", {})
if not isinstance(hits_wrapper, dict):
return []
hits = hits_wrapper.get("hits", [])
if not isinstance(hits, list):
return []
# Dedupe by adsh (accession number) — keep one item per filing
seen_adsh: set[str] = set()
items: list[dict[str, Any]] = []
for hit in hits:
src = hit.get("_source", {})
if not isinstance(src, dict):
continue
adsh = src.get("adsh", "")
if not adsh or adsh in seen_adsh:
continue
ciks = src.get("ciks", [])
if not ciks:
continue
# Skip XML data fragments and non-primary documents
file_type = src.get("file_type", "")
if file_type in ("XML", "GRAPHIC", "ZIP", "EXCEL"):
continue
seen_adsh.add(adsh)
# Build the filing index URL
cik = ciks[0].lstrip("0")
adsh_nodash = adsh.replace("-", "")
filing_index_url = (
f"https://www.sec.gov/Archives/edgar/data/{cik}/{adsh_nodash}/{adsh}-index.htm"
)
# Build a title from the metadata
form = src.get("form", "")
names = src.get("display_names", [])
entity_name = names[0].split("(CIK")[0].strip() if names else ""
file_date = src.get("file_date", "")
file_desc = src.get("file_description", "")
title = f"{form}: {entity_name}" + (f" - {file_desc}" if file_desc else "")
# Enrich the item with URL and structured fields
enriched = dict(src)
enriched["url"] = filing_index_url
enriched["article_url"] = filing_index_url # compat with news URL field
enriched["title"] = title
enriched["name"] = title
enriched["published_utc"] = f"{file_date}T00:00:00Z" if file_date else None
enriched["publisher"] = "SEC EDGAR"
items.append(enriched)
return items
def _total_hits(self, data: dict[str, Any]) -> int:
"""Extract total hit count from EFTS response."""
hits_wrapper = data.get("hits", {})
if not isinstance(hits_wrapper, dict):
return 0
total = hits_wrapper.get("total", {})
if isinstance(total, dict):
return int(total.get("value", 0))
if isinstance(total, int):
return total
return 0
def _error_result(
self,
ticker: str,
error: str,
elapsed_ms: float,
http_status: int | None = None,
raw: bytes = b"",
) -> AdapterResult:
"""Build an error AdapterResult for filings fetches."""
return AdapterResult(
source_type="filings_api",
ticker=ticker,
items=[],
raw_payload=raw,
content_hash="",
fetched_at=datetime.now(timezone.utc),
error=error,
http_status=http_status,
response_time_ms=round(elapsed_ms, 1),
metadata={"provider": "sec_edgar"},
)