From 311d76dc0b388068b3032df5552edbcdaa6a007f Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Sun, 12 Apr 2026 09:42:12 -0700 Subject: [PATCH] phase 17: enrich SEC EDGAR filings with URLs, titles, dedupe by accession number, skip XML fragments --- scripts/check_edgar_urls.py | 27 ++++++++++++ scripts/check_queues.py | 11 +++++ services/adapters/filings_adapter.py | 65 +++++++++++++++++++++++++--- 3 files changed, 96 insertions(+), 7 deletions(-) create mode 100644 scripts/check_edgar_urls.py create mode 100644 scripts/check_queues.py diff --git a/scripts/check_edgar_urls.py b/scripts/check_edgar_urls.py new file mode 100644 index 0000000..c56eb0c --- /dev/null +++ b/scripts/check_edgar_urls.py @@ -0,0 +1,27 @@ +from minio import Minio +import os, json + +mc = Minio(os.environ["MINIO_ENDPOINT"], access_key=os.environ["MINIO_ACCESS_KEY"], secret_key=os.environ["MINIO_SECRET_KEY"], secure=False) +objs = list(mc.list_objects("stonks-raw-filings", recursive=True)) + +for obj in objs[:1]: + data = json.loads(mc.get_object("stonks-raw-filings", obj.object_name).read()) + hits = data.get("hits", {}).get("hits", []) + for h in hits[:5]: + src = h.get("_source", {}) + adsh = src.get("adsh", "") + ciks = src.get("ciks", []) + form = src.get("form", "") + names = src.get("display_names", []) + file_desc = src.get("file_description", "") + file_date = src.get("file_date", "") + file_type = src.get("file_type", "") + if adsh and ciks: + cik = ciks[0].lstrip("0") + adsh_nodash = adsh.replace("-", "") + url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{adsh_nodash}/{adsh}-index.htm" + print(f"form={form} type={file_type} date={file_date}") + print(f" names={names}") + print(f" desc={file_desc}") + print(f" index_url={url}") + print() diff --git a/scripts/check_queues.py b/scripts/check_queues.py new file mode 100644 index 0000000..5217d75 --- /dev/null +++ b/scripts/check_queues.py @@ -0,0 +1,11 @@ +import redis, os +r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0") +for q in ["ingestion","parsing","extraction","aggregation","recommendation","lake_publish","broker_orders"]: + depth = r.llen(f"stonks:queue:{q}") + print(f" {q:20} {depth:>4} pending") + +# Check dead letter queues +for q in ["ingestion","parsing","extraction","aggregation","recommendation"]: + depth = r.llen(f"stonks:dlq:{q}") + if depth > 0: + print(f" DLQ {q:16} {depth:>4} dead letters") diff --git a/services/adapters/filings_adapter.py b/services/adapters/filings_adapter.py index 0c67461..8821d9e 100644 --- a/services/adapters/filings_adapter.py +++ b/services/adapters/filings_adapter.py @@ -152,19 +152,70 @@ class SECEdgarAdapter(FilingsDataAdapter): return url, params, headers def _extract_items(self, data: dict[str, Any]) -> list[dict[str, Any]]: - """Extract the filing hits from an EDGAR EFTS response. + """Extract filing hits from EDGAR EFTS, enrich with fetchable URLs. - EFTS returns results under hits.hits as a list of objects, - each containing _source with fields like file_date, form_type, - entity_name, file_num, and period_of_report. + EFTS returns results under hits.hits. Each hit has _source with + adsh, ciks, form, file_type, file_description, and file_date. + We construct the SEC EDGAR document URL from these fields and + filter to primary filing documents (not XML fragments or exhibits). """ hits_wrapper = data.get("hits", {}) if not isinstance(hits_wrapper, dict): return [] hits = hits_wrapper.get("hits", []) - if isinstance(hits, list): - return hits - return [] + if not isinstance(hits, list): + return [] + + # Dedupe by adsh (accession number) — keep one item per filing + seen_adsh: set[str] = set() + items: list[dict[str, Any]] = [] + + for hit in hits: + src = hit.get("_source", {}) + if not isinstance(src, dict): + continue + + adsh = src.get("adsh", "") + if not adsh or adsh in seen_adsh: + continue + + ciks = src.get("ciks", []) + if not ciks: + continue + + # Skip XML data fragments and non-primary documents + file_type = src.get("file_type", "") + if file_type in ("XML", "GRAPHIC", "ZIP", "EXCEL"): + continue + + seen_adsh.add(adsh) + + # Build the filing index URL + cik = ciks[0].lstrip("0") + adsh_nodash = adsh.replace("-", "") + filing_index_url = ( + f"https://www.sec.gov/Archives/edgar/data/{cik}/{adsh_nodash}/{adsh}-index.htm" + ) + + # Build a title from the metadata + form = src.get("form", "") + names = src.get("display_names", []) + entity_name = names[0].split("(CIK")[0].strip() if names else "" + file_date = src.get("file_date", "") + file_desc = src.get("file_description", "") + title = f"{form}: {entity_name}" + (f" - {file_desc}" if file_desc else "") + + # Enrich the item with URL and structured fields + enriched = dict(src) + enriched["url"] = filing_index_url + enriched["article_url"] = filing_index_url # compat with news URL field + enriched["title"] = title + enriched["name"] = title + enriched["published_utc"] = f"{file_date}T00:00:00Z" if file_date else None + enriched["publisher"] = "SEC EDGAR" + items.append(enriched) + + return items def _total_hits(self, data: dict[str, Any]) -> int: """Extract total hit count from EFTS response."""