From 1043710b6df3a00079cdba70e45cdb8d29a7283e Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Thu, 16 Apr 2026 18:12:12 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20track=20last=5Fpublished=5Fat=20per=20so?= =?UTF-8?q?urce=20to=20avoid=20re-fetching=20same=20articles=20=E2=80=94?= =?UTF-8?q?=20applies=20to=20both=20news=5Fapi=20and=20macro=5Fnews?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/adapters/macro_news_adapter.py | 8 ++++++++ services/adapters/news_adapter.py | 4 ++++ services/ingestion/worker.py | 17 +++++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/services/adapters/macro_news_adapter.py b/services/adapters/macro_news_adapter.py index 24d5341..214b6fc 100644 --- a/services/adapters/macro_news_adapter.py +++ b/services/adapters/macro_news_adapter.py @@ -46,6 +46,9 @@ class MacroNewsAdapter(BaseAdapter): The ticker parameter is ignored for macro sources — these are global/geopolitical news, not company-specific. + Uses published_utc.gt to only fetch articles newer than the last + successful fetch, avoiding re-fetching the same articles. + Args: ticker: Ignored for macro sources (may be empty string). config: Source-specific configuration with url, params, etc. @@ -67,6 +70,11 @@ class MacroNewsAdapter(BaseAdapter): limit = config.get("limit", 20) params["limit"] = str(min(int(limit), 1000)) + # Use last_published_at from config to only fetch newer articles + last_published = config.get("last_published_at") + if last_published: + params["published_utc.gt"] = last_published + async with httpx.AsyncClient(timeout=30) as client: t0 = time.monotonic() try: diff --git a/services/adapters/news_adapter.py b/services/adapters/news_adapter.py index ddca7f3..a86dcb8 100644 --- a/services/adapters/news_adapter.py +++ b/services/adapters/news_adapter.py @@ -128,6 +128,10 @@ class PolygonNewsAdapter(NewsDataAdapter): if config.get("published_utc_lte"): params["published_utc.lte"] = config["published_utc_lte"] + # Auto-filter to only fetch articles newer than last successful fetch + if config.get("last_published_at") and "published_utc.gt" not in params: + params["published_utc.gt"] = config["last_published_at"] + url = f"{self.base_url}{self.NEWS_ENDPOINT}" return url, params diff --git a/services/ingestion/worker.py b/services/ingestion/worker.py index 79c61de..a502c63 100644 --- a/services/ingestion/worker.py +++ b/services/ingestion/worker.py @@ -195,6 +195,23 @@ async def process_job( extra={"ticker": ticker, "source_type": source_type, "count": new_items}, ) + # Track the latest published_utc so next fetch only gets newer articles + if source_type in ("macro_news", "news_api") and result.items: + latest_pub = None + for item in result.items: + pub = item.get("published_utc") + if pub and (latest_pub is None or pub > latest_pub): + latest_pub = pub + if latest_pub: + try: + await pool.execute( + "UPDATE sources SET config = config || $1::jsonb WHERE id = $2", + json.dumps({"last_published_at": latest_pub}), + source_id, + ) + except Exception: + pass # Non-critical + except Exception as e: INGESTION_ERRORS.labels(source_type=source_type).inc() INGESTION_JOBS_TOTAL.labels(source_type=source_type, status="error").inc()