fix: track last_published_at per source to avoid re-fetching same articles — applies to both news_api and macro_news
This commit is contained in:
@@ -46,6 +46,9 @@ class MacroNewsAdapter(BaseAdapter):
|
|||||||
The ticker parameter is ignored for macro sources — these are
|
The ticker parameter is ignored for macro sources — these are
|
||||||
global/geopolitical news, not company-specific.
|
global/geopolitical news, not company-specific.
|
||||||
|
|
||||||
|
Uses published_utc.gt to only fetch articles newer than the last
|
||||||
|
successful fetch, avoiding re-fetching the same articles.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
ticker: Ignored for macro sources (may be empty string).
|
ticker: Ignored for macro sources (may be empty string).
|
||||||
config: Source-specific configuration with url, params, etc.
|
config: Source-specific configuration with url, params, etc.
|
||||||
@@ -67,6 +70,11 @@ class MacroNewsAdapter(BaseAdapter):
|
|||||||
limit = config.get("limit", 20)
|
limit = config.get("limit", 20)
|
||||||
params["limit"] = str(min(int(limit), 1000))
|
params["limit"] = str(min(int(limit), 1000))
|
||||||
|
|
||||||
|
# Use last_published_at from config to only fetch newer articles
|
||||||
|
last_published = config.get("last_published_at")
|
||||||
|
if last_published:
|
||||||
|
params["published_utc.gt"] = last_published
|
||||||
|
|
||||||
async with httpx.AsyncClient(timeout=30) as client:
|
async with httpx.AsyncClient(timeout=30) as client:
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -128,6 +128,10 @@ class PolygonNewsAdapter(NewsDataAdapter):
|
|||||||
if config.get("published_utc_lte"):
|
if config.get("published_utc_lte"):
|
||||||
params["published_utc.lte"] = config["published_utc_lte"]
|
params["published_utc.lte"] = config["published_utc_lte"]
|
||||||
|
|
||||||
|
# Auto-filter to only fetch articles newer than last successful fetch
|
||||||
|
if config.get("last_published_at") and "published_utc.gt" not in params:
|
||||||
|
params["published_utc.gt"] = config["last_published_at"]
|
||||||
|
|
||||||
url = f"{self.base_url}{self.NEWS_ENDPOINT}"
|
url = f"{self.base_url}{self.NEWS_ENDPOINT}"
|
||||||
return url, params
|
return url, params
|
||||||
|
|
||||||
|
|||||||
@@ -195,6 +195,23 @@ async def process_job(
|
|||||||
extra={"ticker": ticker, "source_type": source_type, "count": new_items},
|
extra={"ticker": ticker, "source_type": source_type, "count": new_items},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Track the latest published_utc so next fetch only gets newer articles
|
||||||
|
if source_type in ("macro_news", "news_api") and result.items:
|
||||||
|
latest_pub = None
|
||||||
|
for item in result.items:
|
||||||
|
pub = item.get("published_utc")
|
||||||
|
if pub and (latest_pub is None or pub > latest_pub):
|
||||||
|
latest_pub = pub
|
||||||
|
if latest_pub:
|
||||||
|
try:
|
||||||
|
await pool.execute(
|
||||||
|
"UPDATE sources SET config = config || $1::jsonb WHERE id = $2",
|
||||||
|
json.dumps({"last_published_at": latest_pub}),
|
||||||
|
source_id,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass # Non-critical
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
INGESTION_ERRORS.labels(source_type=source_type).inc()
|
INGESTION_ERRORS.labels(source_type=source_type).inc()
|
||||||
INGESTION_JOBS_TOTAL.labels(source_type=source_type, status="error").inc()
|
INGESTION_JOBS_TOTAL.labels(source_type=source_type, status="error").inc()
|
||||||
|
|||||||
Reference in New Issue
Block a user