From 46c24aefab95e87a64c4a0bf5c088966570b7472 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Mon, 20 Apr 2026 17:24:53 +0000 Subject: [PATCH] fix: prevent duplicate queue entries with Redis SET markers Recovery sweeps and the retry endpoint now check a per-document Redis key (SET NX, 1h TTL) before pushing to the queue. If the marker exists, the doc is already enqueued and gets skipped. This prevents the scheduler from re-enqueuing the same parsed docs every 5 minutes. --- services/api/app.py | 13 ++++++---- services/scheduler/app.py | 50 +++++++++++++++++++++++++++++---------- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/services/api/app.py b/services/api/app.py index 1275b15..7450cbc 100644 --- a/services/api/app.py +++ b/services/api/app.py @@ -1891,6 +1891,7 @@ async def retry_failed_extractions_endpoint(): return {"retried": 0, "message": "No extraction-failed documents to retry"} doc_ids = [] + enqueued_set_prefix = f"{QUEUE_PREFIX}:enqueued" for row in rows: doc_type = row["document_type"] if doc_type == "macro_event": @@ -1898,10 +1899,14 @@ async def retry_failed_extractions_endpoint(): else: target = queue_key("extraction") - await rds.rpush(target, json.dumps({ - "document_id": str(row["id"]), - "ticker": row["ticker"] or "", - })) + doc_id = str(row["id"]) + marker = f"{enqueued_set_prefix}:{doc_id}" + added = await rds.set(marker, "1", nx=True, ex=3600) + if added: + await rds.rpush(target, json.dumps({ + "document_id": doc_id, + "ticker": row["ticker"] or "", + })) doc_ids.append(row["id"]) # Delete failed intelligence rows so extractor starts fresh diff --git a/services/scheduler/app.py b/services/scheduler/app.py index 29a80a6..67d3b79 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -22,6 +22,7 @@ from services.shared.redis_keys import ( QUEUE_EXTRACTION, QUEUE_INGESTION, QUEUE_MACRO_CLASSIFICATION, + QUEUE_PREFIX, lock_key, queue_key, rate_limit_key, @@ -538,6 +539,35 @@ STALE_PARSED_THRESHOLD_MINUTES: int = 30 # How long after an extraction failure before we retry EXTRACTION_FAILED_RETRY_MINUTES: int = 60 +# Redis set key for tracking enqueued doc IDs (prevents duplicate enqueuing) +_ENQUEUED_SET = f"{QUEUE_PREFIX}:enqueued" +# How long an enqueued marker lives before it can be re-enqueued (seconds) +_ENQUEUED_TTL = 3600 + + +async def _enqueue_if_new( + rds: aioredis.Redis, + queue: str, + document_id: str, + ticker: str, +) -> bool: + """Push a job onto *queue* only if *document_id* isn't already tracked. + + Uses a Redis SET with per-member expiry (via a separate key) to prevent + the same document from being enqueued multiple times by recovery sweeps. + Returns True if enqueued, False if skipped as duplicate. + """ + marker_key = f"{_ENQUEUED_SET}:{document_id}" + # SET NX returns True only if the key was created (not already present) + added = await rds.set(marker_key, "1", nx=True, ex=_ENQUEUED_TTL) + if not added: + return False + await rds.rpush(queue, json.dumps({ + "document_id": document_id, + "ticker": ticker, + })) + return True + async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: """Re-enqueue documents stuck in 'parsed' status for extraction. @@ -575,12 +605,10 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in else: target = queue_key(QUEUE_EXTRACTION) - await rds.rpush(target, json.dumps({ - "document_id": str(row["id"]), - "ticker": row["ticker"] or "", - })) - doc_ids.append(row["id"]) - enqueued += 1 + added = await _enqueue_if_new(rds, target, str(row["id"]), row["ticker"] or "") + if added: + doc_ids.append(row["id"]) + enqueued += 1 # Touch updated_at so these docs won't be re-enqueued until the threshold passes again if doc_ids: @@ -627,12 +655,10 @@ async def retry_failed_extractions(pool: asyncpg.Pool, rds: aioredis.Redis) -> i else: target = queue_key(QUEUE_EXTRACTION) - await rds.rpush(target, json.dumps({ - "document_id": str(row["id"]), - "ticker": row["ticker"] or "", - })) - doc_ids.append(row["id"]) - enqueued += 1 + added = await _enqueue_if_new(rds, target, str(row["id"]), row["ticker"] or "") + if added: + doc_ids.append(row["id"]) + enqueued += 1 if doc_ids: # Delete failed intelligence rows so extractor starts fresh